You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/08/30 17:53:10 UTC

nifi git commit: NIFI-4319 - Fixed ArrayIndexOutOfBoundsException in QueryCassandra

Repository: nifi
Updated Branches:
  refs/heads/master 3de0b8edf -> a53a37f9c


NIFI-4319 - Fixed ArrayIndexOutOfBoundsException in QueryCassandra

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2112


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a53a37f9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a53a37f9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a53a37f9

Branch: refs/heads/master
Commit: a53a37f9cabab6a3d52018cde88aceebf994c7c3
Parents: 3de0b8e
Author: Pierre Villard <pi...@gmail.com>
Authored: Sat Aug 26 15:51:12 2017 +0200
Committer: Matthew Burgess <ma...@apache.org>
Committed: Wed Aug 30 13:52:05 2017 -0400

----------------------------------------------------------------------
 .../processors/cassandra/QueryCassandra.java    |  7 ++-
 .../cassandra/CassandraQueryTestUtil.java       | 45 +++++++++++++++++++-
 .../cassandra/QueryCassandraTest.java           |  8 ++++
 3 files changed, 58 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a53a37f9/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
index 7387334..52eb9e0 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
@@ -40,6 +40,7 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -263,6 +264,10 @@ public class QueryCassandra extends AbstractCassandraProcessor {
             // set attribute how many rows were selected
             fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
 
+            // set mime.type based on output format
+            fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(),
+                    JSON_FORMAT.equals(outputFormat) ? "application/json" : "application/avro-binary");
+
             logger.info("{} contains {} Avro records; transferring to 'success'",
                     new Object[]{fileToProcess, nrOfRows.get()});
             session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows",
@@ -510,7 +515,7 @@ public class QueryCassandra extends AbstractCassandraProcessor {
         final int nrOfColumns = (columnDefinitions == null ? 0 : columnDefinitions.size());
         String tableName = "NiFi_Cassandra_Query_Record";
         if (nrOfColumns > 0) {
-            String tableNameFromMeta = columnDefinitions.getTable(1);
+            String tableNameFromMeta = columnDefinitions.getTable(0);
             if (!StringUtils.isBlank(tableNameFromMeta)) {
                 tableName = tableNameFromMeta;
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a53a37f9/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
index 0d5571e..dbe2e1e 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
@@ -61,7 +61,7 @@ public class CassandraQueryTestUtil {
             }
         });
 
-        when(columnDefinitions.getTable(1)).thenReturn("users");
+        when(columnDefinitions.getTable(0)).thenReturn("users");
 
         when(columnDefinitions.getType(anyInt())).thenAnswer(new Answer<DataType>() {
 
@@ -103,6 +103,43 @@ public class CassandraQueryTestUtil {
         return resultSet;
     }
 
+    public static ResultSet createMockResultSetOneColumn() throws Exception {
+        ResultSet resultSet = mock(ResultSet.class);
+        ColumnDefinitions columnDefinitions = mock(ColumnDefinitions.class);
+        when(columnDefinitions.size()).thenReturn(1);
+        when(columnDefinitions.getName(anyInt())).thenAnswer(new Answer<String>() {
+            List<String> colNames = Arrays.asList("user_id");
+            @Override
+            public String answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return colNames.get((Integer) invocationOnMock.getArguments()[0]);
+
+            }
+        });
+
+        when(columnDefinitions.getTable(0)).thenReturn("users");
+
+        when(columnDefinitions.getType(anyInt())).thenAnswer(new Answer<DataType>() {
+            List<DataType> dataTypes = Arrays.asList(DataType.text());
+            @Override
+            public DataType answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return dataTypes.get((Integer) invocationOnMock.getArguments()[0]);
+
+            }
+        });
+
+        List<Row> rows = Arrays.asList(
+                createRow("user1"),
+                createRow("user2")
+        );
+
+        when(resultSet.iterator()).thenReturn(rows.iterator());
+        when(resultSet.all()).thenReturn(rows);
+        when(resultSet.getAvailableWithoutFetching()).thenReturn(rows.size());
+        when(resultSet.isFullyFetched()).thenReturn(false).thenReturn(true);
+        when(resultSet.getColumnDefinitions()).thenReturn(columnDefinitions);
+        return resultSet;
+    }
+
     public static Row createRow(String user_id, String first_name, String last_name, Set<String> emails,
                                 List<String> top_places, Map<Date, String> todo, boolean registered,
                                 float scale, double metric) {
@@ -119,4 +156,10 @@ public class CassandraQueryTestUtil {
 
         return row;
     }
+
+    public static Row createRow(String user_id) {
+        Row row = mock(Row.class);
+        when(row.getString(0)).thenReturn(user_id);
+        return row;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a53a37f9/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
index 5cd54e9..83110c3 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
@@ -241,6 +241,14 @@ public class QueryCassandraTest {
     }
 
     @Test
+    public void testCreateSchemaOneColumn() throws Exception {
+        ResultSet rs = CassandraQueryTestUtil.createMockResultSetOneColumn();
+        Schema schema = QueryCassandra.createSchema(rs);
+        assertNotNull(schema);
+        assertEquals(schema.getName(), "users");
+    }
+
+    @Test
     public void testCreateSchema() throws Exception {
         ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
         Schema schema = QueryCassandra.createSchema(rs);