You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/08/30 17:53:10 UTC
nifi git commit: NIFI-4319 - Fixed ArrayIndexOutOfBoundsException in
QueryCassandra
Repository: nifi
Updated Branches:
refs/heads/master 3de0b8edf -> a53a37f9c
NIFI-4319 - Fixed ArrayIndexOutOfBoundsException in QueryCassandra
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2112
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a53a37f9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a53a37f9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a53a37f9
Branch: refs/heads/master
Commit: a53a37f9cabab6a3d52018cde88aceebf994c7c3
Parents: 3de0b8e
Author: Pierre Villard <pi...@gmail.com>
Authored: Sat Aug 26 15:51:12 2017 +0200
Committer: Matthew Burgess <ma...@apache.org>
Committed: Wed Aug 30 13:52:05 2017 -0400
----------------------------------------------------------------------
.../processors/cassandra/QueryCassandra.java | 7 ++-
.../cassandra/CassandraQueryTestUtil.java | 45 +++++++++++++++++++-
.../cassandra/QueryCassandraTest.java | 8 ++++
3 files changed, 58 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a53a37f9/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
index 7387334..52eb9e0 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
@@ -40,6 +40,7 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -263,6 +264,10 @@ public class QueryCassandra extends AbstractCassandraProcessor {
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ // set mime.type based on output format
+ fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(),
+ JSON_FORMAT.equals(outputFormat) ? "application/json" : "application/avro-binary");
+
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows",
@@ -510,7 +515,7 @@ public class QueryCassandra extends AbstractCassandraProcessor {
final int nrOfColumns = (columnDefinitions == null ? 0 : columnDefinitions.size());
String tableName = "NiFi_Cassandra_Query_Record";
if (nrOfColumns > 0) {
- String tableNameFromMeta = columnDefinitions.getTable(1);
+ String tableNameFromMeta = columnDefinitions.getTable(0);
if (!StringUtils.isBlank(tableNameFromMeta)) {
tableName = tableNameFromMeta;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a53a37f9/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
index 0d5571e..dbe2e1e 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
@@ -61,7 +61,7 @@ public class CassandraQueryTestUtil {
}
});
- when(columnDefinitions.getTable(1)).thenReturn("users");
+ when(columnDefinitions.getTable(0)).thenReturn("users");
when(columnDefinitions.getType(anyInt())).thenAnswer(new Answer<DataType>() {
@@ -103,6 +103,43 @@ public class CassandraQueryTestUtil {
return resultSet;
}
+ public static ResultSet createMockResultSetOneColumn() throws Exception {
+ ResultSet resultSet = mock(ResultSet.class);
+ ColumnDefinitions columnDefinitions = mock(ColumnDefinitions.class);
+ when(columnDefinitions.size()).thenReturn(1);
+ when(columnDefinitions.getName(anyInt())).thenAnswer(new Answer<String>() {
+ List<String> colNames = Arrays.asList("user_id");
+ @Override
+ public String answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return colNames.get((Integer) invocationOnMock.getArguments()[0]);
+
+ }
+ });
+
+ when(columnDefinitions.getTable(0)).thenReturn("users");
+
+ when(columnDefinitions.getType(anyInt())).thenAnswer(new Answer<DataType>() {
+ List<DataType> dataTypes = Arrays.asList(DataType.text());
+ @Override
+ public DataType answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return dataTypes.get((Integer) invocationOnMock.getArguments()[0]);
+
+ }
+ });
+
+ List<Row> rows = Arrays.asList(
+ createRow("user1"),
+ createRow("user2")
+ );
+
+ when(resultSet.iterator()).thenReturn(rows.iterator());
+ when(resultSet.all()).thenReturn(rows);
+ when(resultSet.getAvailableWithoutFetching()).thenReturn(rows.size());
+ when(resultSet.isFullyFetched()).thenReturn(false).thenReturn(true);
+ when(resultSet.getColumnDefinitions()).thenReturn(columnDefinitions);
+ return resultSet;
+ }
+
public static Row createRow(String user_id, String first_name, String last_name, Set<String> emails,
List<String> top_places, Map<Date, String> todo, boolean registered,
float scale, double metric) {
@@ -119,4 +156,10 @@ public class CassandraQueryTestUtil {
return row;
}
+
+ public static Row createRow(String user_id) {
+ Row row = mock(Row.class);
+ when(row.getString(0)).thenReturn(user_id);
+ return row;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a53a37f9/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
index 5cd54e9..83110c3 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
@@ -241,6 +241,14 @@ public class QueryCassandraTest {
}
@Test
+ public void testCreateSchemaOneColumn() throws Exception {
+ ResultSet rs = CassandraQueryTestUtil.createMockResultSetOneColumn();
+ Schema schema = QueryCassandra.createSchema(rs);
+ assertNotNull(schema);
+ assertEquals(schema.getName(), "users");
+ }
+
+ @Test
public void testCreateSchema() throws Exception {
ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
Schema schema = QueryCassandra.createSchema(rs);