You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/09/21 15:18:17 UTC
nifi git commit: NIFI-2749
Repository: nifi
Updated Branches:
refs/heads/master fda15d916 -> 938c7cccb
NIFI-2749
Signed-off-by: Matt Burgess <ma...@apache.org>
This closes #997
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/938c7ccc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/938c7ccc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/938c7ccc
Branch: refs/heads/master
Commit: 938c7cccb8b251d4a1390cf0078b14f53bc94a57
Parents: fda15d9
Author: Peter Wicks <PW...@MICRON.COM>
Authored: Thu Sep 8 21:37:21 2016 -0600
Committer: Matt Burgess <ma...@apache.org>
Committed: Wed Sep 21 11:16:57 2016 -0400
----------------------------------------------------------------------
.../processors/standard/QueryDatabaseTable.java | 17 +++++++++--------
.../standard/QueryDatabaseTableTest.java | 13 +++++++++++--
2 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/938c7ccc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 31bec27..278cc30 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -270,11 +270,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex));
}
- // Add maximum values as attributes
- for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
- fileToProcess = session.putAttribute(fileToProcess, "maxvalue." + entry.getKey(), entry.getValue());
- }
-
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
@@ -290,13 +285,19 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
fragmentIndex++;
}
- //set count on all FlowFiles
- if(maxRowsPerFlowFile > 0) {
- for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+ for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+ // Add maximum values as attributes
+ for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
+ resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + entry.getKey(), entry.getValue()));
+ }
+
+ //set count on all FlowFiles
+ if(maxRowsPerFlowFile > 0) {
resultSetFlowFiles.set(i,
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
}
}
+
} catch (final SQLException e) {
throw e;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/938c7ccc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 974a835..f3904ef 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -184,15 +184,21 @@ public class QueryDatabaseTableTest {
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"2");
runner.run();
- runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
- assertEquals(3, getNumberOfRecordsFromStream(in));
+ assertEquals(2, getNumberOfRecordsFromStream(in));
+
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1);
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+ in = new ByteArrayInputStream(flowFile.toByteArray());
+ assertEquals(1, getNumberOfRecordsFromStream(in));
runner.clearTransferState();
// Run again, this time no flowfiles/rows should be transferred
@@ -200,6 +206,9 @@ public class QueryDatabaseTableTest {
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.clearTransferState();
+ //Remove Max Rows Per Flow File
+ runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"0");
+
// Add a new row with a higher ID and run, one flowfile with one new row should be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
runner.run();