You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/02/17 14:56:24 UTC

nifi git commit: NIFI-4836 - Allow output of FlowFiles during result set processing in QueryDatabaseTable

Repository: nifi
Updated Branches:
  refs/heads/master 82e36f3c7 -> 2a5e21c11


NIFI-4836 - Allow output of FlowFiles during result set processing in QueryDatabaseTable

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2447.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2a5e21c1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2a5e21c1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2a5e21c1

Branch: refs/heads/master
Commit: 2a5e21c11b4c801c6b14e978f80f79cf807483a0
Parents: 82e36f3
Author: Matthew Burgess <ma...@apache.org>
Authored: Thu Feb 1 10:08:09 2018 -0500
Committer: Pierre Villard <pi...@gmail.com>
Committed: Sat Feb 17 15:56:05 2018 +0100

----------------------------------------------------------------------
 .../processors/standard/QueryDatabaseTable.java | 67 ++++++++++++++------
 .../standard/QueryDatabaseTableTest.java        | 57 +++++++++++++++++
 2 files changed, 104 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2a5e21c1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index ffe5845..5613206 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -94,15 +94,16 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
         @WritesAttribute(attribute = "querydbtable.row.count", description="The number of rows selected by the query"),
         @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
                 + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
-        @WritesAttribute(attribute="fragment.count", description="If 'Max Rows Per Flow File' is set then this is the total number of  "
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of  "
                 + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
-                + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."),
+                + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
+                + "attribute will not be populated."),
         @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
                 + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
                 + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order  "
                 + "FlowFiles were produced"),
         @WritesAttribute(attribute = "maxvalue.*", description = "Each attribute contains the observed maximum value of a specified 'Maximum-value Column'. The "
-                + "suffix of the attribute is the name of the column")})
+                + "suffix of the attribute is the name of the column. If Output Batch Size is set, then this attribute will not be populated.")})
 @DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
         + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
 public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
@@ -112,7 +113,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
 
     public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
             .name("Fetch Size")
-            .description("The number of result rows to be fetched from the result set at a time. This is a hint to the driver and may not be "
+            .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
                     + "honored and/or exact. If the value specified is zero, then the hint is ignored.")
             .defaultValue("0")
             .required(true)
@@ -123,8 +124,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
     public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
             .name("qdbt-max-rows")
             .displayName("Max Rows Per Flow File")
-            .description("The maximum number of result rows that will be included in a single FlowFile. " +
-                    "This will allow you to break up very large result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
+            .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
+                    + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("qdbt-output-batch-size")
+            .displayName("Output Batch Size")
+            .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
+                    + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
+                    + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
+                    + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this "
+                    + "property is set.")
             .defaultValue("0")
             .required(true)
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
@@ -135,7 +150,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
             .name("qdbt-max-frags")
             .displayName("Maximum Number of Fragments")
             .description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " +
-                    "This prevents OutOfMemoryError when this processor ingests huge table.")
+                    "This prevents OutOfMemoryError when this processor ingests huge table. NOTE: Setting this property can result in data loss, as the incoming results are "
+                    + "not ordered, and fragments may end at arbitrary boundaries where rows are not included in the result set.")
             .defaultValue("0")
             .required(true)
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
@@ -156,6 +172,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
         pds.add(QUERY_TIMEOUT);
         pds.add(FETCH_SIZE);
         pds.add(MAX_ROWS_PER_FLOW_FILE);
+        pds.add(OUTPUT_BATCH_SIZE);
         pds.add(MAX_FRAGMENTS);
         pds.add(NORMALIZE_NAMES_FOR_AVRO);
         pds.add(USE_AVRO_LOGICAL_TYPES);
@@ -199,6 +216,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
         final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
         final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
         final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+        final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
         final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
                 ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
                 : 0;
@@ -315,6 +334,12 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
 
                         session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                         resultSetFlowFiles.add(fileToProcess);
+                        // If we've reached the batch size, send out the flow files
+                        if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
+                            session.transfer(resultSetFlowFiles, REL_SUCCESS);
+                            session.commit();
+                            resultSetFlowFiles.clear();
+                        }
                     } else {
                         // If there were no rows returned, don't send the flowfile
                         session.remove(fileToProcess);
@@ -328,22 +353,24 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
                     }
                 }
 
-                for (int i = 0; i < resultSetFlowFiles.size(); i++) {
-                    // Add maximum values as attributes
-                    for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
-                        // Get just the column name from the key
-                        String key = entry.getKey();
-                        String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
-                        resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue()));
-                    }
+                // Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes
+                if (outputBatchSize == 0) {
+                    for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+                        // Add maximum values as attributes
+                        for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
+                            // Get just the column name from the key
+                            String key = entry.getKey();
+                            String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
+                            resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue()));
+                        }
 
-                    //set count on all FlowFiles
-                    if(maxRowsPerFlowFile > 0) {
-                        resultSetFlowFiles.set(i,
-                                session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
+                        //set count on all FlowFiles
+                        if (maxRowsPerFlowFile > 0) {
+                            resultSetFlowFiles.set(i,
+                                    session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
+                        }
                     }
                 }
-
             } catch (final SQLException e) {
                 throw e;
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2a5e21c1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 8a2319b..d805423 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -60,6 +60,7 @@ import java.util.Map;
 import java.util.TimeZone;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -500,6 +501,62 @@ public class QueryDatabaseTableTest {
     }
 
     @Test
+    public void testOutputBatchSize() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        InputStream in;
+        MockFlowFile mff;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        int rowCount=0;
+        // Create larger row set
+        for(int batch=0;batch<100;batch++){
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}");
+        runner.setVariable(MAX_ROWS_KEY, "7");
+        runner.setProperty(QueryDatabaseTable.OUTPUT_BATCH_SIZE, "${outputBatchSize}");
+        runner.setVariable("outputBatchSize", "4");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 15);
+
+        // Ensure all but the last file have 7 records each
+        for(int ff=0;ff<14;ff++) {
+            mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff);
+            in = new ByteArrayInputStream(mff.toByteArray());
+            assertEquals(7, getNumberOfRecordsFromStream(in));
+
+            mff.assertAttributeExists("fragment.identifier");
+            assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index"));
+            // No fragment.count set for flow files sent when Output Batch Size is set
+            assertNull(mff.getAttribute("fragment.count"));
+        }
+
+        // Last file should have 2 records
+        mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(14);
+        in = new ByteArrayInputStream(mff.toByteArray());
+        assertEquals(2, getNumberOfRecordsFromStream(in));
+        mff.assertAttributeExists("fragment.identifier");
+        assertEquals(Integer.toString(14), mff.getAttribute("fragment.index"));
+        // No fragment.count set for flow files sent when Output Batch Size is set
+        assertNull(mff.getAttribute("fragment.count"));
+    }
+
+    @Test
     public void testMaxRowsPerFlowFile() throws ClassNotFoundException, SQLException, InitializationException, IOException {
 
         // load test data to database