You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/07/09 17:38:00 UTC
nifi git commit: NIFI-1251 ExecuteSQL Max Rows and Output Batching
Repository: nifi
Updated Branches:
refs/heads/master 12e384b4b -> 382653654
NIFI-1251 ExecuteSQL Max Rows and Output Batching
NIFI-1251: Fixed doc and checkstyle issues
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2834
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/38265365
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/38265365
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/38265365
Branch: refs/heads/master
Commit: 382653654652ac6677b6171a48018f648952428b
Parents: 12e384b
Author: patricker <pa...@gmail.com>
Authored: Fri Jun 29 14:10:14 2018 -0600
Committer: Matthew Burgess <ma...@apache.org>
Committed: Mon Jul 9 13:36:45 2018 -0400
----------------------------------------------------------------------
.../nifi/processors/standard/ExecuteSQL.java | 173 ++++++++++++++-----
.../processors/standard/QueryDatabaseTable.java | 13 +-
.../processors/standard/util/JdbcCommon.java | 1 -
.../processors/standard/TestExecuteSQL.java | 91 ++++++++++
4 files changed, 234 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/38265365/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 66c2338..6cd3ae6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -46,6 +47,7 @@ import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -96,7 +98,17 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
@WritesAttribute(attribute="executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"),
@WritesAttribute(attribute="executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"),
@WritesAttribute(attribute="executesql.resultset.index", description = "Assuming multiple result sets are returned, "
- + "the zero based index of this result set.")
+ + "the zero based index of this result set."),
+ @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
+ @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "
+ + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+ + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
+ + "attribute will not be populated."),
+ @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+ + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ + "FlowFiles were produced")
})
public class ExecuteSQL extends AbstractProcessor {
@@ -106,6 +118,10 @@ public class ExecuteSQL extends AbstractProcessor {
public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
public static final String RESULTSET_INDEX = "executesql.resultset.index";
+ public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -146,6 +162,31 @@ public class ExecuteSQL extends AbstractProcessor {
.sensitive(false)
.build();
+ public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
+ .name("esql-max-rows")
+ .displayName("Max Rows Per Flow File")
+ .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
+ + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("esql-output-batch-size")
+ .displayName("Output Batch Size")
+ .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
+ + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
+ + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
+ + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The fragment.count attribute will not be set on FlowFiles when this "
+ + "property is set.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
private final List<PropertyDescriptor> propDescriptors;
public ExecuteSQL() {
@@ -162,6 +203,8 @@ public class ExecuteSQL extends AbstractProcessor {
pds.add(USE_AVRO_LOGICAL_TYPES);
pds.add(DEFAULT_PRECISION);
pds.add(DEFAULT_SCALE);
+ pds.add(MAX_ROWS_PER_FLOW_FILE);
+ pds.add(OUTPUT_BATCH_SIZE);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -200,11 +243,16 @@ public class ExecuteSQL extends AbstractProcessor {
}
}
+ final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+
final ComponentLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
+ final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+ final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger();
final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger();
final String selectQuery;
@@ -227,6 +275,10 @@ public class ExecuteSQL extends AbstractProcessor {
JdbcCommon.setParameters(st, fileToProcess.getAttributes());
}
logger.debug("Executing query {}", new Object[]{selectQuery});
+
+ int fragmentIndex=0;
+ final String fragmentId = UUID.randomUUID().toString();
+
final StopWatch executionTime = new StopWatch(true);
boolean hasResults = st.execute();
@@ -238,47 +290,84 @@ public class ExecuteSQL extends AbstractProcessor {
while(hasResults || hasUpdateCount) {
//getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
if (hasResults) {
- FlowFile resultSetFF;
- if (fileToProcess == null) {
- resultSetFF = session.create();
- } else {
- resultSetFF = session.create(fileToProcess);
- resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
- }
-
- final StopWatch fetchTime = new StopWatch(true);
-
final AtomicLong nrOfRows = new AtomicLong(0L);
- resultSetFF = session.write(resultSetFF, out -> {
- try {
-
- final ResultSet resultSet = st.getResultSet();
- final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
- .convertNames(convertNamesForAvro)
- .useLogicalTypes(useAvroLogicalTypes)
- .defaultPrecision(defaultPrecision)
- .defaultScale(defaultScale)
- .build();
- nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
- } catch (final SQLException e) {
- throw new ProcessException(e);
- }
- });
-
- long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
- // set attribute how many rows were selected
- resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
- resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
- resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
- resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
- resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
-
- logger.info("{} contains {} Avro records; transferring to 'success'",
- new Object[]{resultSetFF, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
- session.transfer(resultSetFF, REL_SUCCESS);
+ try {
+ final ResultSet resultSet = st.getResultSet();
+ final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
+ .convertNames(convertNamesForAvro)
+ .useLogicalTypes(useAvroLogicalTypes)
+ .defaultPrecision(defaultPrecision)
+ .defaultScale(defaultScale)
+ .maxRows(maxRowsPerFlowFile)
+ .build();
+
+ do {
+ final StopWatch fetchTime = new StopWatch(true);
+
+ FlowFile resultSetFF;
+ if (fileToProcess == null) {
+ resultSetFF = session.create();
+ } else {
+ resultSetFF = session.create(fileToProcess);
+ resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
+ }
+
+ resultSetFF = session.write(resultSetFF, out -> {
+ try {
+ nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
+ } catch (SQLException e) {
+ throw new ProcessException(e);
+ }
+ });
+
+ long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
+
+ // set attribute how many rows were selected
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
+ resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
+ resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+ resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
+
+ // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
+ if(maxRowsPerFlowFile > 0) {
+ // if row count is zero and this is not the first fragment, drop it instead of committing it.
+ if(nrOfRows.get() == 0 && fragmentIndex > 0) {
+ session.remove(resultSetFF);
+ break;
+ }
+
+ resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
+ resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
+ }
+
+ logger.info("{} contains {} Avro records; transferring to 'success'",
+ new Object[]{resultSetFF, nrOfRows.get()});
+ session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
+ resultSetFlowFiles.add(resultSetFF);
+
+ // If we've reached the batch size, send out the flow files
+ if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
+ session.transfer(resultSetFlowFiles, REL_SUCCESS);
+ session.commit();
+ resultSetFlowFiles.clear();
+ }
+
+ fragmentIndex++;
+ } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
+
+ // If we are splitting results but not outputting batches, set count on all FlowFiles
+ if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
+ for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+ resultSetFlowFiles.set(i,
+ session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
+ }
+ }
+ } catch (final SQLException e) {
+ throw new ProcessException(e);
+ }
resultCount++;
}
@@ -293,6 +382,10 @@ public class ExecuteSQL extends AbstractProcessor {
}
}
+ // Transfer any remaining files to SUCCESS
+ session.transfer(resultSetFlowFiles, REL_SUCCESS);
+ resultSetFlowFiles.clear();
+
//If we had at least one result then it's OK to drop the original file, but if we had no results then
// pass the original flow file down the line to trigger downstream processors
if(fileToProcess != null){
http://git-wip-us.apache.org/repos/asf/nifi/blob/38265365/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index b245f57..5c210c4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -38,6 +38,7 @@ import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -116,6 +117,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public static final String RESULT_TABLENAME = "tablename";
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
+ public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("Fetch Size")
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
@@ -355,8 +359,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
fileToProcess = session.putAttribute(fileToProcess, RESULT_TABLENAME, tableName);
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
if(maxRowsPerFlowFile > 0) {
- fileToProcess = session.putAttribute(fileToProcess, "fragment.identifier", fragmentIdentifier);
- fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex));
+ fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_ID, fragmentIdentifier);
+ fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
}
logger.info("{} contains {} Avro records; transferring to 'success'",
@@ -373,7 +377,10 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
} else {
// If there were no rows returned, don't send the flowfile
session.remove(fileToProcess);
- context.yield();
+ // If no rows and this was first FlowFile, yield
+ if(fragmentIndex == 0){
+ context.yield();
+ }
break;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/38265365/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 260a94a..166a81c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -176,7 +176,6 @@ public class JdbcCommon {
.required(true)
.build();
-
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException {
return convertToAvroStream(rs, outStream, null, null, convertNames);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/38265365/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index a5d8f45..9405bd0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -35,6 +35,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
import org.apache.nifi.reporting.InitializationException;
@@ -193,6 +194,96 @@ public class TestExecuteSQL {
}
@Test
+ public void testWithOutputBatching() throws SQLException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+ for (int i = 0; i < 1000; i++) {
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
+ }
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
+ runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "5");
+ runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
+ runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key());
+ runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key());
+
+ MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+
+ firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+ firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
+ firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0");
+ firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+
+ MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199);
+
+ lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+ lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
+ lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+ }
+
+ @Test
+ public void testMaxRowsPerFlowFile() throws SQLException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+ for (int i = 0; i < 1000; i++) {
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
+ }
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
+ runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "0");
+ runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
+ runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key());
+ runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key());
+ runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_COUNT.key());
+
+ MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+
+ firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+ firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0");
+ firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+
+ MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199);
+
+ lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+ lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
+ lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+ }
+
+ @Test
public void testInsertStatementCreatesFlowFile() throws SQLException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);