You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/12/14 15:54:19 UTC
nifi git commit: NIFI-3029: QueryDatabaseTable supports max fragments
property
Repository: nifi
Updated Branches:
refs/heads/master e59cf8665 -> 0f462a7c4
NIFI-3029: QueryDatabaseTable supports max fragments property
Signed-off-by: Matt Burgess <ma...@apache.org>
This closes #1213
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0f462a7c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0f462a7c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0f462a7c
Branch: refs/heads/master
Commit: 0f462a7c4907cb7bd03b7e2005692389bba15986
Parents: e59cf86
Author: Byunghwa Yun <co...@combineads.co.kr>
Authored: Sun Nov 13 13:45:43 2016 +0900
Committer: Matt Burgess <ma...@apache.org>
Committed: Wed Dec 14 10:51:27 2016 -0500
----------------------------------------------------------------------
.../processors/standard/QueryDatabaseTable.java | 17 ++++++++
.../standard/QueryDatabaseTableTest.java | 46 ++++++++++++++++++++
2 files changed, 63 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/0f462a7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index da3d496..00302c9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -120,6 +120,16 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
+ public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder()
+ .name("qdbt-max-frags")
+ .displayName("Maximum Number of Fragments")
+ .description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " +
+ "This prevents OutOfMemoryError when this processor ingests huge table.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
public QueryDatabaseTable() {
final Set<Relationship> r = new HashSet<>();
r.add(REL_SUCCESS);
@@ -134,6 +144,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
+ pds.add(MAX_FRAGMENTS);
pds.add(NORMALIZE_NAMES_FOR_AVRO);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -179,6 +190,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger();
+ final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
+ ? context.getProperty(MAX_FRAGMENTS).asInteger()
+ : 0;
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
@@ -283,6 +297,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
}
fragmentIndex++;
+ if (maxFragments > 0 && fragmentIndex >= maxFragments) {
+ break;
+ }
}
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/0f462a7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 8a6d0b1..3353a87 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -599,6 +599,52 @@ public class QueryDatabaseTableTest {
}
@Test
+ public void testMaxRowsPerFlowFileWithMaxFragments() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+ InputStream in;
+ MockFlowFile mff;
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ int rowCount=0;
+ //create larger row set
+ for (int batch = 0; batch < 100; batch++) {
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ rowCount++;
+ }
+
+ runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "9");
+ Integer maxFragments = 3;
+ runner.setProperty(QueryDatabaseTable.MAX_FRAGMENTS, maxFragments.toString());
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, maxFragments);
+
+ for (int i = 0; i < maxFragments; i++) {
+ mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(i);
+ in = new ByteArrayInputStream(mff.toByteArray());
+ assertEquals(9, getNumberOfRecordsFromStream(in));
+
+ mff.assertAttributeExists("fragment.identifier");
+ assertEquals(Integer.toString(i), mff.getAttribute("fragment.index"));
+ assertEquals(maxFragments.toString(), mff.getAttribute("fragment.count"));
+ }
+
+ runner.clearTransferState();
+ }
+
+ @Test
public void testInitialMaxValue() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database