You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/07/09 17:38:00 UTC

nifi git commit: NIFI-1251 ExecuteSQL Max Rows and Output Batching

Repository: nifi
Updated Branches:
  refs/heads/master 12e384b4b -> 382653654


NIFI-1251 ExecuteSQL Max Rows and Output Batching

NIFI-1251: Fixed doc and checkstyle issues
Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2834


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/38265365
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/38265365
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/38265365

Branch: refs/heads/master
Commit: 382653654652ac6677b6171a48018f648952428b
Parents: 12e384b
Author: patricker <pa...@gmail.com>
Authored: Fri Jun 29 14:10:14 2018 -0600
Committer: Matthew Burgess <ma...@apache.org>
Committed: Mon Jul 9 13:36:45 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ExecuteSQL.java    | 173 ++++++++++++++-----
 .../processors/standard/QueryDatabaseTable.java |  13 +-
 .../processors/standard/util/JdbcCommon.java    |   1 -
 .../processors/standard/TestExecuteSQL.java     |  91 ++++++++++
 4 files changed, 234 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/38265365/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 66c2338..6cd3ae6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -46,6 +47,7 @@ import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -96,7 +98,17 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
     @WritesAttribute(attribute="executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"),
     @WritesAttribute(attribute="executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"),
     @WritesAttribute(attribute="executesql.resultset.index", description = "Assuming multiple result sets are returned, "
-       + "the zero based index of this result set.")
+       + "the zero based index of this result set."),
+    @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+            + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
+    @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of  "
+            + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+            + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
+            + "attribute will not be populated."),
+    @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+            + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+            + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order  "
+                + "FlowFiles were produced")
 })
 public class ExecuteSQL extends AbstractProcessor {
 
@@ -106,6 +118,10 @@ public class ExecuteSQL extends AbstractProcessor {
     public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
     public static final String RESULTSET_INDEX = "executesql.resultset.index";
 
+    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+    public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+
     // Relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -146,6 +162,31 @@ public class ExecuteSQL extends AbstractProcessor {
             .sensitive(false)
             .build();
 
+    public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
+            .name("esql-max-rows")
+            .displayName("Max Rows Per Flow File")
+            .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
+                    + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("esql-output-batch-size")
+            .displayName("Output Batch Size")
+            .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
+                    + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
+                    + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
+                    + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The fragment.count attribute will not be set on FlowFiles when this "
+                    + "property is set.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
     private final List<PropertyDescriptor> propDescriptors;
 
     public ExecuteSQL() {
@@ -162,6 +203,8 @@ public class ExecuteSQL extends AbstractProcessor {
         pds.add(USE_AVRO_LOGICAL_TYPES);
         pds.add(DEFAULT_PRECISION);
         pds.add(DEFAULT_SCALE);
+        pds.add(MAX_ROWS_PER_FLOW_FILE);
+        pds.add(OUTPUT_BATCH_SIZE);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -200,11 +243,16 @@ public class ExecuteSQL extends AbstractProcessor {
             }
         }
 
+        final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+
         final ComponentLog logger = getLogger();
         final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
         final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
         final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
         final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
+        final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+        final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
         final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger();
         final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger();
         final String selectQuery;
@@ -227,6 +275,10 @@ public class ExecuteSQL extends AbstractProcessor {
                 JdbcCommon.setParameters(st, fileToProcess.getAttributes());
             }
             logger.debug("Executing query {}", new Object[]{selectQuery});
+
+            int fragmentIndex=0;
+            final String fragmentId = UUID.randomUUID().toString();
+
             final StopWatch executionTime = new StopWatch(true);
 
             boolean hasResults = st.execute();
@@ -238,47 +290,84 @@ public class ExecuteSQL extends AbstractProcessor {
             while(hasResults || hasUpdateCount) {
                 //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
                 if (hasResults) {
-                    FlowFile resultSetFF;
-                    if (fileToProcess == null) {
-                        resultSetFF = session.create();
-                    } else {
-                        resultSetFF = session.create(fileToProcess);
-                        resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
-                    }
-
-                    final StopWatch fetchTime = new StopWatch(true);
-
                     final AtomicLong nrOfRows = new AtomicLong(0L);
-                    resultSetFF = session.write(resultSetFF, out -> {
-                        try {
-
-                            final ResultSet resultSet = st.getResultSet();
-                            final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
-                                    .convertNames(convertNamesForAvro)
-                                    .useLogicalTypes(useAvroLogicalTypes)
-                                    .defaultPrecision(defaultPrecision)
-                                    .defaultScale(defaultScale)
-                                    .build();
-                            nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
-                        } catch (final SQLException e) {
-                            throw new ProcessException(e);
-                        }
-                    });
-
-                    long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
 
-                    // set attribute how many rows were selected
-                    resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
-                    resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
-                    resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
-                    resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
-                    resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
-                    resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
-
-                    logger.info("{} contains {} Avro records; transferring to 'success'",
-                            new Object[]{resultSetFF, nrOfRows.get()});
-                    session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
-                    session.transfer(resultSetFF, REL_SUCCESS);
+                    try {
+                        final ResultSet resultSet = st.getResultSet();
+                        final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
+                                .convertNames(convertNamesForAvro)
+                                .useLogicalTypes(useAvroLogicalTypes)
+                                .defaultPrecision(defaultPrecision)
+                                .defaultScale(defaultScale)
+                                .maxRows(maxRowsPerFlowFile)
+                                .build();
+
+                        do {
+                            final StopWatch fetchTime = new StopWatch(true);
+
+                            FlowFile resultSetFF;
+                            if (fileToProcess == null) {
+                                resultSetFF = session.create();
+                            } else {
+                                resultSetFF = session.create(fileToProcess);
+                                resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
+                            }
+
+                            resultSetFF = session.write(resultSetFF, out -> {
+                                try {
+                                    nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
+                                } catch (SQLException e) {
+                                    throw new ProcessException(e);
+                                }
+                            });
+
+                            long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
+
+                            // set attribute how many rows were selected
+                            resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+                            resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
+                            resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
+                            resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
+                            resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+                            resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
+
+                            // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
+                            if(maxRowsPerFlowFile > 0) {
+                                // if row count is zero and this is not the first fragment, drop it instead of committing it.
+                                if(nrOfRows.get() == 0 && fragmentIndex > 0) {
+                                    session.remove(resultSetFF);
+                                    break;
+                                }
+
+                                resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
+                                resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
+                            }
+
+                            logger.info("{} contains {} Avro records; transferring to 'success'",
+                                    new Object[]{resultSetFF, nrOfRows.get()});
+                            session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
+                            resultSetFlowFiles.add(resultSetFF);
+
+                            // If we've reached the batch size, send out the flow files
+                            if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
+                                session.transfer(resultSetFlowFiles, REL_SUCCESS);
+                                session.commit();
+                                resultSetFlowFiles.clear();
+                            }
+
+                            fragmentIndex++;
+                        } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
+
+                        // If we are splitting results but not outputting batches, set count on all FlowFiles
+                        if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
+                            for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+                                resultSetFlowFiles.set(i,
+                                        session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
+                            }
+                        }
+                    } catch (final SQLException e) {
+                        throw new ProcessException(e);
+                    }
 
                     resultCount++;
                 }
@@ -293,6 +382,10 @@ public class ExecuteSQL extends AbstractProcessor {
                 }
             }
 
+            // Transfer any remaining files to SUCCESS
+            session.transfer(resultSetFlowFiles, REL_SUCCESS);
+            resultSetFlowFiles.clear();
+
             //If we had at least one result then it's OK to drop the original file, but if we had no results then
             //  pass the original flow file down the line to trigger downstream processors
             if(fileToProcess != null){

http://git-wip-us.apache.org/repos/asf/nifi/blob/38265365/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index b245f57..5c210c4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -38,6 +38,7 @@ import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -116,6 +117,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
     public static final String RESULT_TABLENAME = "tablename";
     public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
 
+    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+
     public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
             .name("Fetch Size")
             .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
@@ -355,8 +359,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
                         fileToProcess = session.putAttribute(fileToProcess, RESULT_TABLENAME, tableName);
                         fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
                         if(maxRowsPerFlowFile > 0) {
-                            fileToProcess = session.putAttribute(fileToProcess, "fragment.identifier", fragmentIdentifier);
-                            fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex));
+                            fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_ID, fragmentIdentifier);
+                            fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
                         }
 
                         logger.info("{} contains {} Avro records; transferring to 'success'",
@@ -373,7 +377,10 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
                     } else {
                         // If there were no rows returned, don't send the flowfile
                         session.remove(fileToProcess);
-                        context.yield();
+                        // If no rows and this was first FlowFile, yield
+                        if(fragmentIndex == 0){
+                            context.yield();
+                        }
                         break;
                     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/38265365/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 260a94a..166a81c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -176,7 +176,6 @@ public class JdbcCommon {
             .required(true)
             .build();
 
-
     public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException {
         return convertToAvroStream(rs, outStream, null, null, convertNames);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/38265365/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index a5d8f45..9405bd0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -35,6 +35,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
 import org.apache.nifi.reporting.InitializationException;
@@ -193,6 +194,96 @@ public class TestExecuteSQL {
     }
 
     @Test
+    public void testWithOutputBatching() throws SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        for (int i = 0; i < 1000; i++) {
+            stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
+        }
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
+        runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "5");
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key());
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key());
+
+        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+        firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
+        firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0");
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+
+        MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199);
+
+        lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+        lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
+        lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+    }
+
+    @Test
+    public void testMaxRowsPerFlowFile() throws SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        for (int i = 0; i < 1000; i++) {
+            stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
+        }
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
+        runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "0");
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key());
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key());
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_COUNT.key());
+
+        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+        firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0");
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+
+        MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199);
+
+        lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+        lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
+        lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+    }
+
+    @Test
     public void testInsertStatementCreatesFlowFile() throws SQLException {
         // remove previous test database, if any
         final File dbLocation = new File(DB_LOCATION);