You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/09/21 15:18:17 UTC

nifi git commit: NIFI-2749

Repository: nifi
Updated Branches:
  refs/heads/master fda15d916 -> 938c7cccb


NIFI-2749

Signed-off-by: Matt Burgess <ma...@apache.org>

This closes #997


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/938c7ccc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/938c7ccc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/938c7ccc

Branch: refs/heads/master
Commit: 938c7cccb8b251d4a1390cf0078b14f53bc94a57
Parents: fda15d9
Author: Peter Wicks <PW...@MICRON.COM>
Authored: Thu Sep 8 21:37:21 2016 -0600
Committer: Matt Burgess <ma...@apache.org>
Committed: Wed Sep 21 11:16:57 2016 -0400

----------------------------------------------------------------------
 .../processors/standard/QueryDatabaseTable.java    | 17 +++++++++--------
 .../standard/QueryDatabaseTableTest.java           | 13 +++++++++++--
 2 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/938c7ccc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 31bec27..278cc30 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -270,11 +270,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
                             fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex));
                         }
 
-                        // Add maximum values as attributes
-                        for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
-                            fileToProcess = session.putAttribute(fileToProcess, "maxvalue." + entry.getKey(), entry.getValue());
-                        }
-
                         logger.info("{} contains {} Avro records; transferring to 'success'",
                                 new Object[]{fileToProcess, nrOfRows.get()});
 
@@ -290,13 +285,19 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
                     fragmentIndex++;
                 }
 
-                //set count on all FlowFiles
-                if(maxRowsPerFlowFile > 0) {
-                    for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+                for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+                    // Add maximum values as attributes
+                    for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
+                        resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + entry.getKey(), entry.getValue()));
+                    }
+
+                    //set count on all FlowFiles
+                    if(maxRowsPerFlowFile > 0) {
                         resultSetFlowFiles.set(i,
                                 session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
                     }
                 }
+
             } catch (final SQLException e) {
                 throw e;
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/938c7ccc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 974a835..f3904ef 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -184,15 +184,21 @@ public class QueryDatabaseTableTest {
         runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
         runner.setIncomingConnection(false);
         runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"2");
 
         runner.run();
-        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2);
 
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
         assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
         InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
         runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
-        assertEquals(3, getNumberOfRecordsFromStream(in));
+        assertEquals(2, getNumberOfRecordsFromStream(in));
+
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        in = new ByteArrayInputStream(flowFile.toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
         runner.clearTransferState();
 
         // Run again, this time no flowfiles/rows should be transferred
@@ -200,6 +206,9 @@ public class QueryDatabaseTableTest {
         runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
         runner.clearTransferState();
 
+        //Remove Max Rows Per Flow File
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"0");
+
         // Add a new row with a higher ID and run, one flowfile with one new row should be transferred
         stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
         runner.run();