You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/12/14 15:54:19 UTC

nifi git commit: NIFI-3029: QueryDatabaseTable supports max fragments property

Repository: nifi
Updated Branches:
  refs/heads/master e59cf8665 -> 0f462a7c4


NIFI-3029: QueryDatabaseTable supports max fragments property

Signed-off-by: Matt Burgess <ma...@apache.org>

This closes #1213


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0f462a7c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0f462a7c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0f462a7c

Branch: refs/heads/master
Commit: 0f462a7c4907cb7bd03b7e2005692389bba15986
Parents: e59cf86
Author: Byunghwa Yun <co...@combineads.co.kr>
Authored: Sun Nov 13 13:45:43 2016 +0900
Committer: Matt Burgess <ma...@apache.org>
Committed: Wed Dec 14 10:51:27 2016 -0500

----------------------------------------------------------------------
 .../processors/standard/QueryDatabaseTable.java | 17 ++++++++
 .../standard/QueryDatabaseTableTest.java        | 46 ++++++++++++++++++++
 2 files changed, 63 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0f462a7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index da3d496..00302c9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -120,6 +120,16 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder()
+            .name("qdbt-max-frags")
+            .displayName("Maximum Number of Fragments")
+            .description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " +
+                    "This prevents OutOfMemoryError when this processor ingests huge table.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
     public QueryDatabaseTable() {
         final Set<Relationship> r = new HashSet<>();
         r.add(REL_SUCCESS);
@@ -134,6 +144,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
         pds.add(QUERY_TIMEOUT);
         pds.add(FETCH_SIZE);
         pds.add(MAX_ROWS_PER_FLOW_FILE);
+        pds.add(MAX_FRAGMENTS);
         pds.add(NORMALIZE_NAMES_FOR_AVRO);
         propDescriptors = Collections.unmodifiableList(pds);
     }
@@ -179,6 +190,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
         final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
         final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
         final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger();
+        final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
+                ? context.getProperty(MAX_FRAGMENTS).asInteger()
+                : 0;
         final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
 
         final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
@@ -283,6 +297,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
                     }
 
                     fragmentIndex++;
+                    if (maxFragments > 0 && fragmentIndex >= maxFragments) {
+                        break;
+                    }
                 }
 
                 for (int i = 0; i < resultSetFlowFiles.size(); i++) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f462a7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 8a6d0b1..3353a87 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -599,6 +599,52 @@ public class QueryDatabaseTableTest {
     }
 
     @Test
+    public void testMaxRowsPerFlowFileWithMaxFragments() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        InputStream in;
+        MockFlowFile mff;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        int rowCount=0;
+        //create larger row set
+        for (int batch = 0; batch < 100; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "9");
+        Integer maxFragments = 3;
+        runner.setProperty(QueryDatabaseTable.MAX_FRAGMENTS, maxFragments.toString());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, maxFragments);
+
+        for (int i = 0; i < maxFragments; i++) {
+            mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(i);
+            in = new ByteArrayInputStream(mff.toByteArray());
+            assertEquals(9, getNumberOfRecordsFromStream(in));
+
+            mff.assertAttributeExists("fragment.identifier");
+            assertEquals(Integer.toString(i), mff.getAttribute("fragment.index"));
+            assertEquals(maxFragments.toString(), mff.getAttribute("fragment.count"));
+        }
+
+        runner.clearTransferState();
+    }
+
+    @Test
     public void testInitialMaxValue() throws ClassNotFoundException, SQLException, InitializationException, IOException {
 
         // load test data to database