You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/04/04 19:58:27 UTC
[9/9] nifi git commit: NIFI-1691: Add Fetch Size property to
QueryDatabaseTable
NIFI-1691: Add Fetch Size property to QueryDatabaseTable
This closes #307
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/71476b2d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/71476b2d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/71476b2d
Branch: refs/heads/support/nifi-0.6.x
Commit: 71476b2d3e313800dba783423d70a044f22b6469
Parents: e5bb1f5
Author: Matt Burgess <ma...@apache.org>
Authored: Mon Mar 28 12:16:24 2016 -0400
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Apr 4 13:54:43 2016 -0400
----------------------------------------------------------------------
.../processors/standard/QueryDatabaseTable.java | 21 ++++++++++++++++++++
.../standard/QueryDatabaseTableTest.java | 1 +
2 files changed, 22 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/71476b2d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 08f6b41..9403eb8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -175,6 +175,15 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
.defaultValue("None")
.build();
+ public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Fetch Size")
+ .description("The number of result rows to be fetched from the result set at a time. This is a hint to the driver and may not be "
+ + "honored and/or exact. If the value specified is zero, then the hint is ignored.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
private final List<PropertyDescriptor> propDescriptors;
@@ -192,6 +201,7 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
pds.add(MAX_VALUE_COLUMN_NAMES);
pds.add(QUERY_TIMEOUT);
pds.add(SQL_PREPROCESS_STRATEGY);
+ pds.add(FETCH_SIZE);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -251,6 +261,8 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
final String columnNames = context.getProperty(COLUMN_NAMES).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
final String preProcessStrategy = context.getProperty(SQL_PREPROCESS_STRATEGY).getValue();
+ final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
+
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
@@ -272,6 +284,15 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
+ if (fetchSize != null && fetchSize > 0) {
+ try {
+ st.setFetchSize(fetchSize);
+ } catch (SQLException se) {
+ // Not all drivers support this, just log the error (at debug level) and move on
+ logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
+ }
+ }
+
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
http://git-wip-us.apache.org/repos/asf/nifi/blob/71476b2d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index d16b9c6..f932e4d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -154,6 +154,7 @@ public class QueryDatabaseTableTest {
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+ runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
assertEquals(3, getNumberOfRecordsFromStream(in));
runner.clearTransferState();