You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/10/06 08:53:39 UTC

[1/4] nifi git commit: NIFI-4517: Added ExecuteSQLRecord and QueryDatabaseTableRecord processors

Repository: nifi
Updated Branches:
  refs/heads/master b4810b8dd -> c6572f042


http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
new file mode 100644
index 0000000..04c4c00
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestExecuteSQLRecord {
+
+    private static final Logger LOGGER;
+
+    static {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ExecuteSQLRecord", "debug");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestExecuteSQLRecord", "debug");
+        LOGGER = LoggerFactory.getLogger(TestExecuteSQLRecord.class);
+    }
+
+    final static String DB_LOCATION = "target/db";
+
+    final static String QUERY_WITH_EL = "select "
+            + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+            + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+            + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE as RelCode"
+            + ", ROW_NUMBER() OVER () as rownr "
+            + " from persons PER, products PRD, relationships REL"
+            + " where PER.ID = ${person.id}";
+
+    final static String QUERY_WITHOUT_EL = "select "
+            + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+            + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+            + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE as RelCode"
+            + ", ROW_NUMBER() OVER () as rownr "
+            + " from persons PER, products PRD, relationships REL"
+            + " where PER.ID = 10";
+
+    final static String QUERY_WITHOUT_EL_WITH_PARAMS = "select "
+            + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+            + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+            + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE as RelCode"
+            + ", ROW_NUMBER() OVER () as rownr "
+            + " from persons PER, products PRD, relationships REL"
+            + " where PER.ID < ? AND REL.ID < ?";
+
+
+    @BeforeClass
+    public static void setupClass() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+    }
+
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException {
+        final DBCPService dbcp = new DBCPServiceSimpleImpl();
+        final Map<String, String> dbcpProperties = new HashMap<>();
+
+        runner = TestRunners.newTestRunner(ExecuteSQLRecord.class);
+        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+        runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "dbcp");
+    }
+
+    @Test
+    public void testIncomingConnectionWithNoFlowFile() throws InitializationException {
+        runner.setIncomingConnection(true);
+        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM persons");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.run();
+        runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 0);
+        runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testIncomingConnectionWithNoFlowFileAndNoQuery() throws InitializationException {
+        runner.setIncomingConnection(true);
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.run();
+        runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 0);
+        runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 0);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testNoIncomingConnectionAndNoQuery() throws InitializationException {
+        runner.setIncomingConnection(false);
+        runner.run();
+    }
+
+    @Test
+    public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+        runner.setIncomingConnection(false);
+        invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, false, null, true);
+        assertEquals(ProvenanceEventType.RECEIVE, runner.getProvenanceEvents().get(0).getEventType());
+    }
+
+    @Test
+    public void testSelectQueryInFlowFile() throws InitializationException, ClassNotFoundException, SQLException, IOException {
+        invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, true, null, false);
+        assertEquals(ProvenanceEventType.FORK, runner.getProvenanceEvents().get(0).getEventType());
+        assertEquals(ProvenanceEventType.FETCH, runner.getProvenanceEvents().get(1).getEventType());
+    }
+
+    @Test
+    public void testMaxRowsPerFlowFile() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        for (int i = 0; i < 1000; i++) {
+            stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
+        }
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(AbstractExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
+        runner.setProperty(AbstractExecuteSQL.OUTPUT_BATCH_SIZE, "0");
+        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 200);
+        runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 0);
+        runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key());
+        runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key());
+        runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_COUNT.key());
+
+        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0);
+
+        firstFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "5");
+        firstFlowFile.assertAttributeEquals("record.count", "5");
+        firstFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain"); // MockRecordWriter has text/plain MIME type
+        firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0");
+        firstFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULTSET_INDEX, "0");
+
+        MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(199);
+
+        lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "5");
+        lastFlowFile.assertAttributeEquals("record.count", "5");
+        lastFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain"); // MockRecordWriter has text/plain MIME type
+        lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
+        lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULTSET_INDEX, "0");
+    }
+
+    @Test
+    public void testInsertStatementCreatesFlowFile() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "0");
+    }
+
+    @Test
+    public void testNoRowsStatementCreatesEmptyFlowFile() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.enqueue("Hello".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
+        firstFlowFile.assertContentEquals("");
+    }
+
+    @Test
+    public void testWithSqlException() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NO_ROWS");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NO_ROWS (id integer)");
+
+        runner.setIncomingConnection(false);
+        // Try a valid SQL statement that will generate an error (val1 does not exist, e.g.)
+        runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM TEST_NO_ROWS");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.run();
+
+        //No incoming flow file containing a query, and an exception causes no outbound flowfile.
+        // There should be no flow files on either relationship
+        runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_FAILURE, 0);
+        runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 0);
+    }
+
+    public void invokeOnTriggerRecords(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final Map<String, String> attrs, final boolean setQueryProperty)
+            throws InitializationException, ClassNotFoundException, SQLException, IOException {
+
+        if (queryTimeout != null) {
+            runner.setProperty(AbstractExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs");
+        }
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100);
+        LOGGER.info("test data loaded");
+
+        // ResultSet size will be 1x200x100 = 20 000 rows
+        // because of where PER.ID = ${person.id}
+        final int nrOfRows = 20000;
+
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+
+        if (incomingFlowFile) {
+            // incoming FlowFile content is not used, but attributes are used
+            final Map<String, String> attributes = (attrs == null) ? new HashMap<>() : attrs;
+            attributes.put("person.id", "10");
+            if (!setQueryProperty) {
+                runner.enqueue(query.getBytes(), attributes);
+            } else {
+                runner.enqueue("Hello".getBytes(), attributes);
+            }
+        }
+
+        if (setQueryProperty) {
+            runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, query);
+        }
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 1);
+        runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, AbstractExecuteSQL.RESULT_QUERY_DURATION);
+        runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, AbstractExecuteSQL.RESULT_QUERY_EXECUTION_TIME);
+        runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, AbstractExecuteSQL.RESULT_QUERY_FETCH_TIME);
+        runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, AbstractExecuteSQL.RESULT_ROW_COUNT);
+
+        final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS);
+        final long executionTime = Long.parseLong(flowfiles.get(0).getAttribute(AbstractExecuteSQL.RESULT_QUERY_EXECUTION_TIME));
+        final long fetchTime = Long.parseLong(flowfiles.get(0).getAttribute(AbstractExecuteSQL.RESULT_QUERY_FETCH_TIME));
+        final long durationTime = Long.parseLong(flowfiles.get(0).getAttribute(AbstractExecuteSQL.RESULT_QUERY_DURATION));
+        assertEquals(durationTime, fetchTime + executionTime);
+    }
+
+
+    /**
+     * Simple implementation only for ExecuteSQL processor testing.
+     */
+    class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+                return con;
+            } catch (final Exception e) {
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+
+}


[2/4] nifi git commit: NIFI-4517: Added ExecuteSQLRecord and QueryDatabaseTableRecord processors

Posted by pv...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
new file mode 100644
index 0000000..a1d67c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
@@ -0,0 +1,1332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the QueryDatabaseTableRecord processor
+ */
+public class QueryDatabaseTableRecordTest {
+
+    MockQueryDatabaseTableRecord processor;
+    private TestRunner runner;
+    private final static String DB_LOCATION = "target/db_qdt";
+    private DatabaseAdapter dbAdapter;
+    private HashMap<String, DatabaseAdapter> origDbAdapters;
+    private final static String TABLE_NAME_KEY = "tableName";
+    private final static String MAX_ROWS_KEY = "maxRows";
+
+
+    @BeforeClass
+    public static void setupBeforeClass() {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        try {
+            FileUtils.deleteFile(dbLocation, true);
+        } catch (IOException ioe) {
+            // Do nothing, may not have existed
+        }
+    }
+
+    @AfterClass
+    public static void cleanUpAfterClass() throws Exception {
+        try {
+            DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
+        } catch (SQLNonTransientConnectionException e) {
+            // Do nothing, this is what happens at Derby shutdown
+        }
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        try {
+            FileUtils.deleteFile(dbLocation, true);
+        } catch (IOException ioe) {
+            // Do nothing, may not have existed
+        }
+    }
+
+
+    @Before
+    public void setup() throws InitializationException, IOException {
+        final DBCPService dbcp = new DBCPServiceSimpleImpl();
+        final Map<String, String> dbcpProperties = new HashMap<>();
+        origDbAdapters = new HashMap<>(QueryDatabaseTableRecord.dbAdapters);
+        dbAdapter = new GenericDatabaseAdapter();
+        QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), dbAdapter);
+        processor = new MockQueryDatabaseTableRecord();
+        runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+        runner.setProperty(QueryDatabaseTableRecord.DBCP_SERVICE, "dbcp");
+        runner.setProperty(QueryDatabaseTableRecord.DB_TYPE, dbAdapter.getName());
+        runner.getStateManager().clear(Scope.CLUSTER);
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(QueryDatabaseTableRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+    }
+
+    @After
+    public void teardown() throws IOException {
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner = null;
+        QueryDatabaseTableRecord.dbAdapters.clear();
+        QueryDatabaseTableRecord.dbAdapters.putAll(origDbAdapters);
+    }
+
+    @Test
+    public void testGetQuery() throws Exception {
+        String query = processor.getQuery(dbAdapter, "myTable", null, null, null, null);
+        assertEquals("SELECT * FROM myTable", query);
+        query = processor.getQuery(dbAdapter, "myTable", "col1,col2", null, null, null);
+        assertEquals("SELECT col1,col2 FROM myTable", query);
+
+        query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null, null);
+        assertEquals("SELECT * FROM myTable", query);
+
+        Map<String, String> maxValues = new HashMap<>();
+        maxValues.put("id", "509");
+        StateManager stateManager = runner.getStateManager();
+        stateManager.setState(maxValues, Scope.CLUSTER);
+        processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("mytable", "id", dbAdapter), Types.INTEGER);
+        query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null, stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM myTable WHERE id > 509", query);
+
+        maxValues.put("date_created", "2016-03-07 12:34:56");
+        stateManager.setState(maxValues, Scope.CLUSTER);
+        processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("mytable", "date_created", dbAdapter), Types.TIMESTAMP);
+        query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), null, stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query);
+
+        // Double quotes can be used to escape column and table names with most ANSI compatible database engines.
+        maxValues.put("mytable@!@date-created", "2016-03-07 12:34:56");
+        stateManager.setState(maxValues, Scope.CLUSTER);
+        processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("\"myTable\"", "\"DATE-CREATED\"", dbAdapter), Types.TIMESTAMP);
+        query = processor.getQuery(dbAdapter, "\"myTable\"", null, Arrays.asList("id", "\"DATE-CREATED\""), null, stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM \"myTable\" WHERE id > 509 AND \"DATE-CREATED\" >= '2016-03-07 12:34:56'", query);
+
+        // Back-ticks can be used to escape MySQL column and table names.
+        dbAdapter = new MySQLDatabaseAdapter();
+        processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("`myTable`", "`DATE-CREATED`", dbAdapter), Types.TIMESTAMP);
+        query = processor.getQuery(dbAdapter, "`myTable`", null, Arrays.asList("id", "`DATE-CREATED`"), null, stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM `myTable` WHERE id > 509 AND `DATE-CREATED` >= '2016-03-07 12:34:56'", query);
+
+        // Square brackets can be used to escape Microsoft SQL Server column and table names.
+        dbAdapter = new MSSQLDatabaseAdapter();
+        processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("[myTable]", "[DATE-CREATED]", dbAdapter), Types.TIMESTAMP);
+        query = processor.getQuery(dbAdapter, "[myTable]", null, Arrays.asList("id", "[DATE-CREATED]"), null, stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM [myTable] WHERE id > 509 AND [DATE-CREATED] >= '2016-03-07 12:34:56'", query);
+
+        // Test Oracle strategy
+        dbAdapter = new OracleDatabaseAdapter();
+        query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND (type = \"CUSTOMER\")", query);
+
+        // Test time.
+        processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", Types.TIME);
+        maxValues.clear();
+        maxValues.put("id", "509");
+        maxValues.put("time_created", "12:34:57");
+        maxValues.put("date_created", "2016-03-07 12:34:56");
+        stateManager = runner.getStateManager();
+        stateManager.clear(Scope.CLUSTER);
+        stateManager.setState(maxValues, Scope.CLUSTER);
+        query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= timestamp '12:34:57' AND (type = \"CUSTOMER\")", query);
+        dbAdapter = new GenericDatabaseAdapter();
+        query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = \"CUSTOMER\")", query);
+    }
+
+    @Test
+    public void testGetQueryUsingPhoenixAdapter() throws Exception {
+        Map<String, String> maxValues = new HashMap<>();
+        StateManager stateManager = runner.getStateManager();
+        processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER);
+        processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", Types.TIME);
+        processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", Types.TIMESTAMP);
+
+        maxValues.put("id", "509");
+        maxValues.put("time_created", "12:34:57");
+        maxValues.put("date_created", "2016-03-07 12:34:56");
+        stateManager.setState(maxValues, Scope.CLUSTER);
+
+        dbAdapter = new PhoenixDatabaseAdapter();
+        String query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= time '12:34:57' AND (type = \"CUSTOMER\")", query);
+        // Cover the other path
+        dbAdapter = new GenericDatabaseAdapter();
+        query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = \"CUSTOMER\")", query);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testGetQueryNoTable() {
+        processor.getQuery(dbAdapter, null, null, null, null, null);
+    }
+
+    @Test
+    public void testAddedRows() throws SQLException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
+
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        runner.setProperty(QueryDatabaseTableRecord.FETCH_SIZE, "2");
+        flowFile.assertAttributeEquals("record.count", "2");
+
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        //Remove Max Rows Per Flow File
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "0");
+
+        // Add a new row with a higher ID and run, one flowfile with one new row should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "3");
+        flowFile.assertAttributeEquals("record.count", "1");
+
+        // Sanity check - run again, this time no flowfiles/rows should be transferred
+        runner.clearTransferState();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add timestamp as a max value column name
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "id, created_on");
+
+        // Add a new row with a higher ID and run, one flow file will be transferred because no max value for the timestamp has been stored
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
+        assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234");
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID but lower timestamp and run, no flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (5, 'NO NAME', 15.0, '2001-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID and run, one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (6, 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "name");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "7");
+        runner.clearTransferState();
+
+        // Add a new row with a "higher" name than the max but lower than "NULL" (to test that null values are skipped), one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (7, 'NULK', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "scale");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "8");
+        runner.clearTransferState();
+
+        // Add a new row with a higher value for scale than the max, one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (8, 'NULK', 100.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "bignum");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "9");
+        runner.clearTransferState();
+
+        // Add a new row with a higher value for scale than the max, one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on, bignum) VALUES (9, 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 1)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testAddedRowsTwoTables() throws SQLException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE2");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
+
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        flowFile.assertAttributeEquals("record.count", "2");
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Populate a second table and set
+        stmt.execute("create table TEST_QUERY_DB_TABLE2 (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE2");
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "0");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE2", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        flowFile.assertAttributeEquals("record.count", "3");
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID and run, one flowfile with one new row should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "3");
+        flowFile.assertAttributeEquals("record.count", "1");
+
+        // Sanity check - run again, this time no flowfiles/rows should be transferred
+        runner.clearTransferState();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testMultiplePartitions() throws SQLException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID, BUCKET");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        assertEquals("2",
+                runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
+        );
+        runner.clearTransferState();
+
+        // Add a new row in the same bucket
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        assertEquals("1",
+                runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
+        );
+        runner.clearTransferState();
+
+        // Add a new row in a new bucket
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (3, 1)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        assertEquals("1",
+                runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
+        );
+        runner.clearTransferState();
+
+        // Add a new row in an old bucket, it should not be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (4, 0)");
+        runner.run();
+        runner.assertTransferCount(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+
+        // Add a new row in the second bucket, only the new row should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (5, 1)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        assertEquals("1",
+                runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTableRecord.RESULT_ROW_COUNT)
+        );
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testTimestampNanos() throws SQLException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.000123456')");
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "created_on");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add a new row with a lower timestamp (but same millisecond value), no flow file should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.000')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add a new row with a higher timestamp, one flow file should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.0003')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testWithNullIntColumn() throws SQLException {
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+            // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).assertAttributeEquals(QueryDatabaseTableRecord.RESULT_ROW_COUNT, "2");
+    }
+
+    @Test
+    public void testWithRuntimeException() throws SQLException {
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+            // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_NULL_INT");
+        runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
+
+        QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() {
+            @Override
+            public String getName() {
+                throw new RuntimeException("test");
+            }
+        });
+        runner.run();
+
+        assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).isEmpty());
+    }
+
+    @Test
+    public void testWithSqlException() throws SQLException {
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NO_ROWS");
+        } catch (final SQLException sqle) {
+            // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+        }
+
+        stmt.execute("create table TEST_NO_ROWS (id integer)");
+
+        runner.setIncomingConnection(false);
+        // Try a valid SQL statement that will generate an error (val1 does not exist, e.g.)
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_NO_ROWS");
+        runner.setProperty(QueryDatabaseTableRecord.COLUMN_NAMES, "val1");
+        runner.run();
+
+        assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).isEmpty());
+    }
+
+    @Test
+    public void testOutputBatchSize() throws SQLException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        MockFlowFile mff;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        int rowCount = 0;
+        // Create larger row set
+        for (int batch = 0; batch < 100; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}");
+        runner.setVariable(MAX_ROWS_KEY, "7");
+        runner.setProperty(QueryDatabaseTableRecord.OUTPUT_BATCH_SIZE, "${outputBatchSize}");
+        runner.setVariable("outputBatchSize", "4");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 15);
+
+        // Ensure all but the last file have 7 records each
+        for (int ff = 0; ff < 14; ff++) {
+            mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(ff);
+            mff.assertAttributeEquals("record.count", "7");
+
+            mff.assertAttributeExists("fragment.identifier");
+            assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index"));
+            // No fragment.count set for flow files sent when Output Batch Size is set
+            assertNull(mff.getAttribute("fragment.count"));
+        }
+
+        // Last file should have 2 records
+        mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(14);
+        mff.assertAttributeEquals("record.count", "2");
+        mff.assertAttributeExists("fragment.identifier");
+        assertEquals(Integer.toString(14), mff.getAttribute("fragment.index"));
+        // No fragment.count set for flow files sent when Output Batch Size is set
+        assertNull(mff.getAttribute("fragment.count"));
+    }
+
+    @Test
+    public void testMaxRowsPerFlowFile() throws IOException, SQLException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        MockFlowFile mff;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        int rowCount = 0;
+        //create larger row set
+        for (int batch = 0; batch < 100; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}");
+        runner.setVariable(MAX_ROWS_KEY, "9");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 12);
+
+        //ensure all but the last file have 9 records each
+        for (int ff = 0; ff < 11; ff++) {
+            mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(ff);
+            mff.assertAttributeEquals("record.count", "9");
+
+            mff.assertAttributeExists("fragment.identifier");
+            assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index"));
+            assertEquals("12", mff.getAttribute("fragment.count"));
+        }
+
+        //last file should have 1 record
+        mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(11);
+        mff.assertAttributeEquals("record.count", "1");
+        mff.assertAttributeExists("fragment.identifier");
+        assertEquals(Integer.toString(11), mff.getAttribute("fragment.index"));
+        assertEquals("12", mff.getAttribute("fragment.count"));
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Run again, this time should be a single partial flow file
+        for (int batch = 0; batch < 5; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        mff.assertAttributeExists("fragment.identifier");
+        assertEquals(Integer.toString(0), mff.getAttribute("fragment.index"));
+        assertEquals("1", mff.getAttribute("fragment.count"));
+        mff.assertAttributeEquals("record.count", "5");
+        runner.clearTransferState();
+
+        // Run again, this time should be a full batch and a partial
+        for (int batch = 0; batch < 14; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
+        mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        mff.assertAttributeEquals("record.count", "9");
+        mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
+        mff.assertAttributeEquals("record.count", "5");
+        runner.clearTransferState();
+
+        // Run again with a cleaned state. Should get all rows split into batches
+        int ffCount = (int) Math.ceil(rowCount / 9D);
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, ffCount);
+
+        //ensure all but the last file have 9 records each
+        for (int ff = 0; ff < ffCount - 1; ff++) {
+            mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(ff);
+            mff.assertAttributeEquals("record.count", "9");
+        }
+
+        mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(ffCount - 1);
+        mff.assertAttributeEquals("record.count", Integer.toString(rowCount % 9));
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testMaxRowsPerFlowFileWithMaxFragments() throws SQLException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        MockFlowFile mff;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        int rowCount = 0;
+        //create larger row set
+        for (int batch = 0; batch < 100; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+            rowCount++;
+        }
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "9");
+        Integer maxFragments = 3;
+        runner.setProperty(QueryDatabaseTableRecord.MAX_FRAGMENTS, maxFragments.toString());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, maxFragments);
+
+        for (int i = 0; i < maxFragments; i++) {
+            mff = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(i);
+            mff.assertAttributeEquals("record.count", "9");
+
+            mff.assertAttributeExists("fragment.identifier");
+            assertEquals(Integer.toString(i), mff.getAttribute("fragment.index"));
+            assertEquals(maxFragments.toString(), mff.getAttribute("fragment.count"));
+        }
+
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testInitialMaxValue() throws SQLException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        cal.setTimeInMillis(0);
+
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+        dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        int rowCount = 0;
+        //create larger row set
+        for (int batch = 0; batch < 10; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+            rowCount++;
+            cal.add(Calendar.MINUTE, 1);
+        }
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+        runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "created_on");
+
+        cal.setTimeInMillis(0);
+        cal.add(Calendar.MINUTE, 5);
+        runner.setProperty("initial.maxvalue.CREATED_ON", dateFormat.format(cal.getTime().getTime()));
+        // Initial run with no previous state. Should get only last 4 records
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "4");
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        // Validate Max Value doesn't change also
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testInitialMaxValueWithEL() throws SQLException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        cal.setTimeInMillis(0);
+
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+        dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        int rowCount = 0;
+        //create larger row set
+        for (int batch = 0; batch < 10; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+            rowCount++;
+            cal.add(Calendar.MINUTE, 1);
+        }
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+        runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "created_on");
+
+        cal.setTimeInMillis(0);
+        cal.add(Calendar.MINUTE, 5);
+        runner.setProperty("initial.maxvalue.CREATED_ON", "${created.on}");
+        runner.setVariable("created.on", dateFormat.format(cal.getTime().getTime()));
+        // Initial run with no previous state. Should get only last 4 records
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "4");
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        // Validate Max Value doesn't change also
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+
+        // Append a new row, expect 1 flowfile one row
+        cal.setTimeInMillis(0);
+        cal.add(Calendar.MINUTE, rowCount);
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:10:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testAddedRowsCustomWhereClause() throws SQLException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setProperty(QueryDatabaseTableRecord.WHERE_CLAUSE, "type = 'male'");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "0");
+        flowFile.assertAttributeEquals("record.count", "1");
+
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        //Remove Max Rows Per Flow File
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "0");
+
+        // Add a new row with a higher ID and run, one flowfile with one new row should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (3, 'female', 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Sanity check - run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add timestamp as a max value column name
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "id, created_on");
+
+        // Add a new row with a higher ID and run, one flow file will be transferred because no max value for the timestamp has been stored
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
+        assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234");
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID but lower timestamp and run, no flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (5, 'male', 'NO NAME', 15.0, '2001-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID and run, one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "name");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "4");
+        runner.clearTransferState();
+
+        // Add a new row with a "higher" name than the max but lower than "NULL" (to test that null values are skipped), one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (7, 'male', 'NULK', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "scale");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "5");
+        runner.clearTransferState();
+
+        // Add a new row with a higher value for scale than the max, one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (8, 'male', 'NULK', 100.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "bignum");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "6");
+        runner.clearTransferState();
+
+        // Add a new row with a higher value for scale than the max, one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on, bignum) VALUES (9, 'female', 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 1)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testCustomSQL() throws SQLException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        try {
+            stmt.execute("drop table TYPE_LIST");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        stmt.execute("create table TYPE_LIST (type_id integer not null, type varchar(20), descr varchar(255))");
+        stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (0, 'male', 'Man')");
+        stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (1, 'female', 'Woman')");
+        stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (2, '', 'Unspecified')");
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setProperty(QueryDatabaseTableRecord.SQL_QUERY,
+                "SELECT id, b.type as gender, b.descr, name, scale, created_on, bignum FROM TEST_QUERY_DB_TABLE a INNER JOIN TYPE_LIST b ON (a.type=b.type)");
+        runner.setProperty(QueryDatabaseTableRecord.WHERE_CLAUSE, "gender = 'male'");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "0");
+        flowFile.assertAttributeEquals("record.count", "1");
+
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        //Remove Max Rows Per Flow File
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "0");
+
+        // Add a new row with a higher ID and run, one flowfile with one new row should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (3, 'female', 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Sanity check - run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add timestamp as a max value column name
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "id, created_on");
+
+        // Add a new row with a higher ID and run, one flow file will be transferred because no max value for the timestamp has been stored
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
+        assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234");
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID but lower timestamp and run, no flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (5, 'male', 'NO NAME', 15.0, '2001-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add a new row with a higher ID and run, one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "name");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "4");
+        runner.clearTransferState();
+
+        // Add a new row with a "higher" name than the max but lower than "NULL" (to test that null values are skipped), one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (7, 'male', 'NULK', 1.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "scale");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "5");
+        runner.clearTransferState();
+
+        // Add a new row with a higher value for scale than the max, one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (8, 'male', 'NULK', 100.0, '2012-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "1");
+        runner.clearTransferState();
+
+        // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+        runner.getStateManager().clear(Scope.CLUSTER);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "bignum");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "6");
+        runner.clearTransferState();
+
+        // Add a new row with a higher value for scale than the max, one flow file will be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on, bignum) VALUES (9, 'female', 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 1)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.clearTransferState();
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testMissingColumn() throws ProcessException, SQLException {
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        try {
+            stmt.execute("drop table TYPE_LIST");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        stmt.execute("create table TYPE_LIST (type_id integer not null, type varchar(20), descr varchar(255))");
+        stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (0, 'male', 'Man')");
+        stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (1, 'female', 'Woman')");
+        stmt.execute("insert into TYPE_LIST (type_id, type,descr) VALUES (2, '', 'Unspecified')");
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TYPE_LIST");
+        runner.setProperty(QueryDatabaseTableRecord.SQL_QUERY, "SELECT b.type, b.descr, name, scale, created_on, bignum FROM TEST_QUERY_DB_TABLE a INNER JOIN TYPE_LIST b ON (a.type=b.type)");
+        runner.setProperty(QueryDatabaseTableRecord.WHERE_CLAUSE, "type = 'male'");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, "2");
+
+        runner.run();
+    }
+
+    @Test
+    public void testWithExceptionAfterSomeRowsProcessed() throws SQLException {
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+            // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, NULL, 1)");
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (2, 1, 1)");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_NULL_INT");
+        runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
+
+        // Override adapter with one that fails after the first row is processed
+        QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() {
+            boolean fail = false;
+
+            @Override
+            public String getName() {
+                if (!fail) {
+                    fail = true;
+                    return super.getName();
+                }
+                throw new RuntimeException("test");
+            }
+        });
+        runner.run();
+        assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).isEmpty());
+        // State should not have been updated
+        runner.getStateManager().assertStateNotSet("test_null_int@!@id", Scope.CLUSTER);
+
+        // Restore original (working) adapter and run again
+        QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), dbAdapter);
+        runner.run();
+        assertFalse(runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).isEmpty());
+        runner.getStateManager().assertStateEquals("test_null_int@!@id", "2", Scope.CLUSTER);
+    }
+
+    /**
+     * Simple implementation only for QueryDatabaseTableRecord processor testing.
+     */
+    private class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+            } catch (final Exception e) {
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+
+    @Stateful(scopes = Scope.CLUSTER, description = "Mock for QueryDatabaseTableRecord processor")
+    private static class MockQueryDatabaseTableRecord extends QueryDatabaseTableRecord {
+        void putColumnType(String colName, Integer colType) {
+            columnTypeMap.put(colName, colType);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 1624c6d..8b51fe2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -38,13 +38,13 @@ import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.util.file.FileUtils;
-import org.fusesource.hawtbuf.ByteArrayInputStream;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 33633f2..63de91a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -22,6 +22,7 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -51,7 +52,6 @@ import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.fusesource.hawtbuf.ByteArrayInputStream;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -352,6 +352,48 @@ public class TestExecuteSQL {
     }
 
     @Test
+    public void testNoRowsStatementCreatesEmptyFlowFile() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
+        runner.enqueue("Hello".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
+        final InputStream in = new ByteArrayInputStream(firstFlowFile.toByteArray());
+        final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+        try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
+            GenericRecord record = null;
+            long recordsFromStream = 0;
+            while (dataFileReader.hasNext()) {
+                // Reuse record object by passing it to next(). This saves us from
+                // allocating and garbage collecting many objects for files with
+                // many items.
+                record = dataFileReader.next(record);
+                recordsFromStream += 1;
+            }
+
+            assertEquals(0, recordsFromStream);
+        }
+    }
+
+    @Test
     public void testWithDuplicateColumns() throws SQLException {
         // remove previous test database, if any
         final File dbLocation = new File(DB_LOCATION);


[3/4] nifi git commit: NIFI-4517: Added ExecuteSQLRecord and QueryDatabaseTableRecord processors

Posted by pv...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
new file mode 100644
index 0000000..31d0ec8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+
+@EventDriven
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@Tags({"sql", "select", "jdbc", "query", "database", "record"})
+@CapabilityDescription("Executes provided SQL select query. Query result will be converted to the format specified by a Record Writer. "
+        + "Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on "
+        + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. "
+        + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the "
+        + "select query, and the query may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+        + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+        + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format. "
+        + "FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer "
+                + "that represents the JDBC Type of the parameter."),
+        @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as "
+                + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."),
+        @ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. "
+                + "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases "
+                + "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - "
+                + "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. "
+                + "base64: the string is a Base64 encoded string that can be decoded to bytes. "
+                + "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. "
+                + "Dates/Times/Timestamps - "
+                + "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+                + "as specified according to java.time.format.DateTimeFormatter. "
+                + "If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1), or a string value in "
+                + "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some database engines e.g. Derby or MySQL do not support milliseconds and will truncate milliseconds), "
+                + "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "executesql.row.count", description = "Contains the number of rows returned in the select query"),
+        @WritesAttribute(attribute = "executesql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds"),
+        @WritesAttribute(attribute = "executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"),
+        @WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"),
+        @WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, "
+                + "the zero based index of this result set."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+                + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of  "
+                + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+                + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
+                + "attribute will not be populated."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order  "
+                + "FlowFiles were produced"),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."),
+        @WritesAttribute(attribute = "record.count", description = "The number of records output by the Record Writer.")
+})
+public class ExecuteSQLRecord extends AbstractExecuteSQL {
+
+
+    public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
+            .name("esqlrecord-record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. "
+                    + "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor NORMALIZE_NAMES = new PropertyDescriptor.Builder()
+            .name("esqlrecord-normalize")
+            .displayName("Normalize Table/Column Names")
+            .description("Whether to change characters in column names. For example, colons and periods will be changed to underscores.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public ExecuteSQLRecord() {
+        final Set<Relationship> r = new HashSet<>();
+        r.add(REL_SUCCESS);
+        r.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(r);
+
+        final List<PropertyDescriptor> pds = new ArrayList<>();
+        pds.add(DBCP_SERVICE);
+        pds.add(SQL_SELECT_QUERY);
+        pds.add(QUERY_TIMEOUT);
+        pds.add(RECORD_WRITER_FACTORY);
+        pds.add(NORMALIZE_NAMES);
+        pds.add(USE_AVRO_LOGICAL_TYPES);
+        pds.add(MAX_ROWS_PER_FLOW_FILE);
+        pds.add(OUTPUT_BATCH_SIZE);
+        propDescriptors = Collections.unmodifiableList(pds);
+    }
+
+    @Override
+    protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess) {
+        final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES).asBoolean();
+        final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
+        final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
+                .convertNames(convertNamesForAvro)
+                .useLogicalTypes(useAvroLogicalTypes)
+                .build();
+        final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+        return new RecordSqlWriter(recordSetWriterFactory, options, maxRowsPerFlowFile, fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 1923e2c..71348ef 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -27,49 +26,21 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateManager;
-import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.flowfile.attributes.FragmentAttributes;
-import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
 import org.apache.nifi.processors.standard.util.JdbcCommon;
-import org.apache.nifi.util.StopWatch;
 
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.ParseException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.IntStream;
 
 import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
 import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
@@ -112,60 +83,7 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
 @DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column",
         expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Specifies an initial max value for max value column(s). Properties should "
         + "be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).")
-public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
-
-    public static final String RESULT_TABLENAME = "tablename";
-    public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
-
-    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
-    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
-
-    public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
-            .name("Fetch Size")
-            .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
-                    + "honored and/or exact. If the value specified is zero, then the hint is ignored.")
-            .defaultValue("0")
-            .required(true)
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-
-    public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
-            .name("qdbt-max-rows")
-            .displayName("Max Rows Per Flow File")
-            .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
-                    + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
-            .defaultValue("0")
-            .required(true)
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-
-    public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
-            .name("qdbt-output-batch-size")
-            .displayName("Output Batch Size")
-            .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
-                    + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
-                    + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
-                    + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this "
-                    + "property is set.")
-            .defaultValue("0")
-            .required(true)
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-
-    public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder()
-            .name("qdbt-max-frags")
-            .displayName("Maximum Number of Fragments")
-            .description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " +
-                    "This prevents OutOfMemoryError when this processor ingests huge table. NOTE: Setting this property can result in data loss, as the incoming results are "
-                    + "not ordered, and fragments may end at arbitrary boundaries where rows are not included in the result set.")
-            .defaultValue("0")
-            .required(true)
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
+public class QueryDatabaseTable extends AbstractQueryDatabaseTable {
 
     public QueryDatabaseTable() {
         final Set<Relationship> r = new HashSet<>();
@@ -197,365 +115,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
     }
 
     @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return propDescriptors;
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .name(propertyDescriptorName)
-                .required(false)
-                .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
-                .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
-                .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-                .dynamic(true)
-                .build();
-    }
-
-    @OnScheduled
-    public void setup(final ProcessContext context) {
-        maxValueProperties = getDefaultMaxValueProperties(context, null);
-    }
-
-    @OnStopped
-    public void stop() {
-        // Reset the column type map in case properties change
-        setupComplete.set(false);
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
-        // Fetch the column/table info once
-        if (!setupComplete.get()) {
-            super.setup(context);
-        }
-        ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
-
-        final ComponentLog logger = getLogger();
-
-        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+    protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context) {
         final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
-        final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
-        final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
-        final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
-        final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
-        final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
+        final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
+        final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
         final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
-        final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-        final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
-        final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
-                ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
-                : 0;
+        final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
+        final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger();
+
         final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
                 .recordName(tableName)
+                .convertNames(convertNamesForAvro)
+                .useLogicalTypes(useAvroLogicalTypes)
+                .defaultPrecision(defaultPrecision)
+                .defaultScale(defaultScale)
                 .maxRows(maxRowsPerFlowFile)
-                .convertNames(context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean())
-                .useLogicalTypes(context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean())
-                .defaultPrecision(context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger())
-                .defaultScale(context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger())
                 .build();
-
-        final StateManager stateManager = context.getStateManager();
-        final StateMap stateMap;
-
-        try {
-            stateMap = stateManager.getState(Scope.CLUSTER);
-        } catch (final IOException ioe) {
-            getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
-                    + "query until this is accomplished.", ioe);
-            context.yield();
-            return;
-        }
-        // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
-        // set as the current state map (after the session has been committed)
-        final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
-
-        //If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
-        for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
-            String maxPropKey = maxProp.getKey().toLowerCase();
-            String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey, dbAdapter);
-            if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
-                String newMaxPropValue;
-                // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
-                // the value has been stored under a key that is only the column name. Fall back to check the column name,
-                // but store the new initial max value under the fully-qualified key.
-                if (statePropertyMap.containsKey(maxPropKey)) {
-                    newMaxPropValue = statePropertyMap.get(maxPropKey);
-                } else {
-                    newMaxPropValue = maxProp.getValue();
-                }
-                statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);
-
-            }
-        }
-
-        List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
-                ? null
-                : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
-        final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
-        final StopWatch stopWatch = new StopWatch(true);
-        final String fragmentIdentifier = UUID.randomUUID().toString();
-
-        try (final Connection con = dbcpService.getConnection(Collections.emptyMap());
-             final Statement st = con.createStatement()) {
-
-            if (fetchSize != null && fetchSize > 0) {
-                try {
-                    st.setFetchSize(fetchSize);
-                } catch (SQLException se) {
-                    // Not all drivers support this, just log the error (at debug level) and move on
-                    logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
-                }
-            }
-
-            String jdbcURL = "DBCPService";
-            try {
-                DatabaseMetaData databaseMetaData = con.getMetaData();
-                if (databaseMetaData != null) {
-                    jdbcURL = databaseMetaData.getURL();
-                }
-            } catch (SQLException se) {
-                // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
-            }
-
-            final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
-            st.setQueryTimeout(queryTimeout); // timeout in seconds
-            if (logger.isDebugEnabled()) {
-                logger.debug("Executing query {}", new Object[] { selectQuery });
-            }
-            try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
-                int fragmentIndex=0;
-                // Max values will be updated in the state property map by the callback
-                final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
-
-                while(true) {
-                    final AtomicLong nrOfRows = new AtomicLong(0L);
-
-                    FlowFile fileToProcess = session.create();
-                    try {
-                        fileToProcess = session.write(fileToProcess, out -> {
-                            try {
-                                nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, maxValCollector));
-                            } catch (SQLException | RuntimeException e) {
-                                throw new ProcessException("Error during database query or conversion of records to Avro.", e);
-                            }
-                        });
-                    } catch (ProcessException e) {
-                        // Add flowfile to results before rethrowing so it will be removed from session in outer catch
-                        resultSetFlowFiles.add(fileToProcess);
-                        throw e;
-                    }
-
-                    if (nrOfRows.get() > 0) {
-                        // set attribute how many rows were selected
-                        fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
-                        fileToProcess = session.putAttribute(fileToProcess, RESULT_TABLENAME, tableName);
-                        fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
-                        if(maxRowsPerFlowFile > 0) {
-                            fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_ID, fragmentIdentifier);
-                            fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
-                        }
-
-                        logger.info("{} contains {} Avro records; transferring to 'success'",
-                                new Object[]{fileToProcess, nrOfRows.get()});
-
-                        session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-                        resultSetFlowFiles.add(fileToProcess);
-                        // If we've reached the batch size, send out the flow files
-                        if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
-                            session.transfer(resultSetFlowFiles, REL_SUCCESS);
-                            session.commit();
-                            resultSetFlowFiles.clear();
-                        }
-                    } else {
-                        // If there were no rows returned, don't send the flowfile
-                        session.remove(fileToProcess);
-                        // If no rows and this was first FlowFile, yield
-                        if(fragmentIndex == 0){
-                            context.yield();
-                        }
-                        break;
-                    }
-
-                    fragmentIndex++;
-                    if (maxFragments > 0 && fragmentIndex >= maxFragments) {
-                        break;
-                    }
-
-                    // If we aren't splitting up the data into flow files or fragments, then the result set has been entirely fetched so don't loop back around
-                    if (maxFragments == 0 && maxRowsPerFlowFile == 0) {
-                        break;
-                    }
-
-                    // If we are splitting up the data into flow files, don't loop back around if we've gotten all results
-                    if(maxRowsPerFlowFile > 0 && nrOfRows.get() < maxRowsPerFlowFile) {
-                        break;
-                    }
-                }
-
-                // Apply state changes from the Max Value tracker
-                maxValCollector.applyStateChanges();
-
-                // Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes
-                if (outputBatchSize == 0) {
-                    for (int i = 0; i < resultSetFlowFiles.size(); i++) {
-                        // Add maximum values as attributes
-                        for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
-                            // Get just the column name from the key
-                            String key = entry.getKey();
-                            String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
-                            resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue()));
-                        }
-
-                        //set count on all FlowFiles
-                        if (maxRowsPerFlowFile > 0) {
-                            resultSetFlowFiles.set(i,
-                                    session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
-                        }
-                    }
-                }
-            } catch (final SQLException e) {
-                throw e;
-            }
-
-            session.transfer(resultSetFlowFiles, REL_SUCCESS);
-
-        } catch (final ProcessException | SQLException e) {
-            logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
-            if (!resultSetFlowFiles.isEmpty()) {
-                session.remove(resultSetFlowFiles);
-            }
-            context.yield();
-        } finally {
-            session.commit();
-            try {
-                // Update the state
-                stateManager.setState(statePropertyMap, Scope.CLUSTER);
-            } catch (IOException ioe) {
-                getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", new Object[]{this, ioe});
-            }
-        }
-    }
-
-    protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
-                              String customWhereClause, Map<String, String> stateMap) {
-
-        return getQuery(dbAdapter, tableName, null, columnNames, maxValColumnNames, customWhereClause, stateMap);
-    }
-
-    protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String sqlQuery, String columnNames, List<String> maxValColumnNames,
-                              String customWhereClause, Map<String, String> stateMap) {
-        if (StringUtils.isEmpty(tableName)) {
-            throw new IllegalArgumentException("Table name must be specified");
-        }
-        final StringBuilder query;
-
-        if (StringUtils.isEmpty(sqlQuery)) {
-            query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
-        } else {
-            query = getWrappedQuery(dbAdapter, sqlQuery, tableName);
-        }
-
-        List<String> whereClauses = new ArrayList<>();
-        // Check state map for last max values
-        if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
-            IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
-                String colName = maxValColumnNames.get(index);
-                String maxValueKey = getStateKey(tableName, colName, dbAdapter);
-                String maxValue = stateMap.get(maxValueKey);
-                if (StringUtils.isEmpty(maxValue)) {
-                    // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
-                    // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
-                    // maximum value is observed, it will be stored under the fully-qualified key from then on.
-                    maxValue = stateMap.get(colName.toLowerCase());
-                }
-                if (!StringUtils.isEmpty(maxValue)) {
-                    Integer type = columnTypeMap.get(maxValueKey);
-                    if (type == null) {
-                        // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled.
-                        throw new IllegalArgumentException("No column type found for: " + colName);
-                    }
-                    // Add a condition for the WHERE clause
-                    whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
-                }
-            });
-        }
-
-        if (customWhereClause != null) {
-            whereClauses.add("(" + customWhereClause + ")");
-        }
-
-        if (!whereClauses.isEmpty()) {
-            query.append(" WHERE ");
-            query.append(StringUtils.join(whereClauses, " AND "));
-        }
-
-        return query.toString();
-    }
-
-    protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
-        DatabaseAdapter dbAdapter;
-        final Map<String, String> newColMap;
-        final Map<String, String> originalState;
-        String tableName;
-
-        public MaxValueResultSetRowCollector(String tableName, Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
-            this.dbAdapter = dbAdapter;
-            this.originalState = stateMap;
-
-            this.newColMap = new HashMap<>();
-            this.newColMap.putAll(stateMap);
-
-            this.tableName = tableName;
-        }
-
-        @Override
-        public void processRow(ResultSet resultSet) throws IOException {
-            if (resultSet == null) {
-                return;
-            }
-            try {
-                // Iterate over the row, check-and-set max values
-                final ResultSetMetaData meta = resultSet.getMetaData();
-                final int nrOfColumns = meta.getColumnCount();
-                if (nrOfColumns > 0) {
-                    for (int i = 1; i <= nrOfColumns; i++) {
-                        String colName = meta.getColumnName(i).toLowerCase();
-                        String fullyQualifiedMaxValueKey = getStateKey(tableName, colName, dbAdapter);
-                        Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey);
-                        // Skip any columns we're not keeping track of or whose value is null
-                        if (type == null || resultSet.getObject(i) == null) {
-                            continue;
-                        }
-                        String maxValueString = newColMap.get(fullyQualifiedMaxValueKey);
-                        // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
-                        // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
-                        // maximum value is observed, it will be stored under the fully-qualified key from then on.
-                        if (StringUtils.isEmpty(maxValueString)) {
-                            maxValueString = newColMap.get(colName);
-                        }
-                        String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName());
-                        if (newMaxValueString != null) {
-                            newColMap.put(fullyQualifiedMaxValueKey, newMaxValueString);
-                        }
-                    }
-                }
-            } catch (ParseException | SQLException e) {
-                throw new IOException(e);
-            }
-        }
-
-        @Override
-        public void applyStateChanges() {
-            this.originalState.putAll(this.newColMap);
-        }
+        return new DefaultAvroSqlWriter(options);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
new file mode 100644
index 0000000..ea89256
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+
+
+@TriggerSerially
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"sql", "select", "jdbc", "query", "database", "record"})
+@SeeAlso({GenerateTableFetch.class, ExecuteSQL.class})
+@CapabilityDescription("Generates a SQL select query, or uses a provided statement, and executes it to fetch all rows whose values in the specified "
+        + "Maximum Value column(s) are larger than the "
+        + "previously-seen maxima. Query result will be converted to the format specified by the record writer. Expression Language is supported for several properties, but no incoming "
+        + "connections are permitted. The Variable Registry may be used to provide values for any property containing Expression Language. If it is desired to "
+        + "leverage flow file attributes to perform these queries, the GenerateTableFetch and/or ExecuteSQL processors can be used for this purpose. "
+        + "Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on "
+        + "a timer or cron expression, using the standard scheduling methods. This processor is intended to be run on the Primary Node only. FlowFile attribute "
+        + "'querydbtable.row.count' indicates how many rows were selected.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for "
+        + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor "
+        + "to fetch only those records that have max values greater than the retained values. This can be used for "
+        + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+        + "per the State Management documentation")
+@WritesAttributes({
+        @WritesAttribute(attribute = "tablename", description="Name of the table being queried"),
+        @WritesAttribute(attribute = "querydbtable.row.count", description="The number of rows selected by the query"),
+        @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+                + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of  "
+                + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+                + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
+                + "attribute will not be populated."),
+        @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order  "
+                + "FlowFiles were produced"),
+        @WritesAttribute(attribute = "maxvalue.*", description = "Each attribute contains the observed maximum value of a specified 'Maximum-value Column'. The "
+                + "suffix of the attribute is the name of the column. If Output Batch Size is set, then this attribute will not be populated."),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."),
+        @WritesAttribute(attribute = "record.count", description = "The number of records output by the Record Writer.")
+})
+@DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column",
+        expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Specifies an initial max value for max value column(s). Properties should "
+        + "be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).")
+public class QueryDatabaseTableRecord extends AbstractQueryDatabaseTable {
+
+    public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
+            .name("qdbtr-record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. "
+                    + "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor NORMALIZE_NAMES = new PropertyDescriptor.Builder()
+            .name("qdbtr-normalize")
+            .displayName("Normalize Table/Column Names")
+            .description("Whether to change characters in column names when creating the output schema. For example, colons and periods will be changed to underscores.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public QueryDatabaseTableRecord() {
+        final Set<Relationship> r = new HashSet<>();
+        r.add(REL_SUCCESS);
+        relationships = Collections.unmodifiableSet(r);
+
+        final List<PropertyDescriptor> pds = new ArrayList<>();
+        pds.add(DBCP_SERVICE);
+        pds.add(DB_TYPE);
+        pds.add(new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(TABLE_NAME)
+                .description("The name of the database table to be queried. When a custom query is used, this property is used to alias the query and appears as an attribute on the FlowFile.")
+                .build());
+        pds.add(COLUMN_NAMES);
+        pds.add(WHERE_CLAUSE);
+        pds.add(SQL_QUERY);
+        pds.add(RECORD_WRITER_FACTORY);
+        pds.add(MAX_VALUE_COLUMN_NAMES);
+        pds.add(QUERY_TIMEOUT);
+        pds.add(FETCH_SIZE);
+        pds.add(MAX_ROWS_PER_FLOW_FILE);
+        pds.add(OUTPUT_BATCH_SIZE);
+        pds.add(MAX_FRAGMENTS);
+        pds.add(NORMALIZE_NAMES);
+        pds.add(USE_AVRO_LOGICAL_TYPES);
+
+        propDescriptors = Collections.unmodifiableList(pds);
+    }
+
+    @Override
+    protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context) {
+        final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES).asBoolean();
+        final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
+        final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
+                .convertNames(convertNamesForAvro)
+                .useLogicalTypes(useAvroLogicalTypes)
+                .build();
+        final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+        return new RecordSqlWriter(recordSetWriterFactory, options, maxRowsPerFlowFile, Collections.emptyMap());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
new file mode 100644
index 0000000..574aca7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.sql;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultAvroSqlWriter implements SqlWriter {
+
+    private final JdbcCommon.AvroConversionOptions options;
+
+    private final Map<String,String> attributesToAdd = new HashMap<String,String>() {{
+        put(CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+    }};
+
+    public DefaultAvroSqlWriter(JdbcCommon.AvroConversionOptions options) {
+        this.options = options;
+    }
+
+    @Override
+    public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception {
+        try {
+            return JdbcCommon.convertToAvroStream(resultSet, outputStream, options, callback);
+        } catch (SQLException e) {
+            throw new ProcessException(e);
+        }
+    }
+
+    @Override
+    public Map<String, String> getAttributesToAdd() {
+        return attributesToAdd;
+    }
+
+    @Override
+    public void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) throws IOException {
+        JdbcCommon.createEmptyAvroStream(outputStream);
+    }
+
+    @Override
+    public String getMimeType() {
+        return JdbcCommon.MIME_TYPE_AVRO_BINARY;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
new file mode 100644
index 0000000..c1a76b4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.sql;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class RecordSqlWriter implements SqlWriter {
+
+    private final RecordSetWriterFactory recordSetWriterFactory;
+    private final AtomicReference<WriteResult> writeResultRef;
+    private final JdbcCommon.AvroConversionOptions options;
+    private final int maxRowsPerFlowFile;
+    private final Map<String, String> originalAttributes;
+    private ResultSetRecordSet fullRecordSet;
+    private RecordSchema writeSchema;
+    private String mimeType;
+
+    public RecordSqlWriter(RecordSetWriterFactory recordSetWriterFactory, JdbcCommon.AvroConversionOptions options, int maxRowsPerFlowFile, Map<String, String> originalAttributes) {
+        this.recordSetWriterFactory = recordSetWriterFactory;
+        this.writeResultRef = new AtomicReference<>();
+        this.maxRowsPerFlowFile = maxRowsPerFlowFile;
+        this.options = options;
+        this.originalAttributes = originalAttributes;
+    }
+
+    @Override
+    public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception {
+        final RecordSet recordSet;
+        try {
+            if (fullRecordSet == null) {
+                final Schema avroSchema = JdbcCommon.createSchema(resultSet, options);
+                final RecordSchema recordAvroSchema = AvroTypeUtil.createSchema(avroSchema);
+                fullRecordSet = new ResultSetRecordSetWithCallback(resultSet, recordAvroSchema, callback);
+                writeSchema = recordSetWriterFactory.getSchema(originalAttributes, fullRecordSet.getSchema());
+            }
+            recordSet = (maxRowsPerFlowFile > 0) ? fullRecordSet.limit(maxRowsPerFlowFile) : fullRecordSet;
+
+        } catch (final SQLException | SchemaNotFoundException | IOException e) {
+            throw new ProcessException(e);
+        }
+        try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream)) {
+            writeResultRef.set(resultSetWriter.write(recordSet));
+            if (mimeType == null) {
+                mimeType = resultSetWriter.getMimeType();
+            }
+            return writeResultRef.get().getRecordCount();
+        } catch (final Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public Map<String, String> getAttributesToAdd() {
+        Map<String, String> attributesToAdd = new HashMap<>();
+        attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeType);
+
+        // Add any attributes from the record writer (if present)
+        final WriteResult result = writeResultRef.get();
+        if (result != null) {
+            if (result.getAttributes() != null) {
+                attributesToAdd.putAll(result.getAttributes());
+            }
+
+            attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
+        }
+        return attributesToAdd;
+    }
+
+    @Override
+    public void updateCounters(ProcessSession session) {
+        final WriteResult result = writeResultRef.get();
+        if (result != null) {
+            session.adjustCounter("Records Written", result.getRecordCount(), false);
+        }
+    }
+
+    @Override
+    public void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) throws IOException {
+        try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream)) {
+            mimeType = resultSetWriter.getMimeType();
+            resultSetWriter.beginRecordSet();
+            resultSetWriter.finishRecordSet();
+        } catch (final Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public String getMimeType() {
+        return mimeType;
+    }
+
+    private static class ResultSetRecordSetWithCallback extends ResultSetRecordSet {
+
+        private final AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback;
+
+        ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema readerSchema, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws SQLException {
+            super(rs, readerSchema);
+            this.callback = callback;
+        }
+
+        @Override
+        public Record next() throws IOException {
+            try {
+                if (hasMoreRows()) {
+                    ResultSet rs = getResultSet();
+                    final Record record = createRecord(rs);
+                    if (callback != null) {
+                        callback.processRow(rs);
+                    }
+                    setMoreRows(rs.next());
+                    return record;
+                } else {
+                    return null;
+                }
+            } catch (final SQLException e) {
+                throw new IOException("Could not obtain next record from ResultSet", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
new file mode 100644
index 0000000..08fc3fd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.sql;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.ResultSet;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * The SqlWriter interface provides a standard way for processors such as ExecuteSQL, ExecuteSQLRecord, QueryDatabaseTable, and QueryDatabaseTableRecord
+ * to write SQL result sets out to a flow file in whichever manner is appropriate. For example, ExecuteSQL writes the result set as Avro but ExecuteSQLRecord
+ * uses the Record API to write the result set out as prescribed by the selected RecordSetWriter.
+ */
+public interface SqlWriter {
+
+    /**
+     * Writes the given result set out to the given output stream, possibly applying a callback as each row is processed.
+     * @param resultSet the ResultSet to be written
+     * @param outputStream the OutputStream to write the result set to
+     * @param logger a common logger that can be used to log messages during write
+     * @param callback a MaxValueResultSetRowCollector that may be called as each row in the ResultSet is processed
+     * @return the number of rows written to the output stream
+     * @throws Exception if any errors occur during the writing of the result set to the output stream
+     */
+    long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception;
+
+    /**
+     * Returns a map of attribute key/value pairs to be added to any outgoing flow file(s). The default implementation is to return an empty map.
+     * @return a map of attribute key/value pairs
+     */
+    default Map<String, String> getAttributesToAdd() {
+        return Collections.emptyMap();
+    }
+
+    /**
+     * Updates any session counters as a result of processing result sets. The default implementation is empty, no counters will be updated.
+     * @param session the session upon which to update counters
+     */
+    default void updateCounters(ProcessSession session) {
+    }
+
+    /**
+     * Writes an empty result set to the output stream. In some cases a ResultSet might not have any viable rows, but will throw an error or
+     * behave unexpectedly if rows are attempted to be retrieved. This method indicates the implementation should write whatever output is
+     * appropriate for a result set with no rows.
+     * @param outputStream the OutputStream to write the empty result set to
+     * @param logger a common logger that can be used to log messages during write
+     * @throws IOException if any errors occur during the writing of an empty result set to the output stream
+     */
+    void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) throws IOException;
+
+    /**
+     * Returns the MIME type of the output format. This can be used in FlowFile attributes or to perform format-specific processing as necessary.
+     * @return the MIME type string of the output format.
+     */
+    String getMimeType();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index d21b7f4..bfe1403 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -35,6 +35,7 @@ org.apache.nifi.processors.standard.EvaluateXPath
 org.apache.nifi.processors.standard.EvaluateXQuery
 org.apache.nifi.processors.standard.ExecuteProcess
 org.apache.nifi.processors.standard.ExecuteSQL
+org.apache.nifi.processors.standard.ExecuteSQLRecord
 org.apache.nifi.processors.standard.ExecuteStreamCommand
 org.apache.nifi.processors.standard.ExtractGrok
 org.apache.nifi.processors.standard.ExtractText
@@ -96,6 +97,7 @@ org.apache.nifi.processors.standard.PutSyslog
 org.apache.nifi.processors.standard.PutTCP
 org.apache.nifi.processors.standard.PutUDP
 org.apache.nifi.processors.standard.QueryDatabaseTable
+org.apache.nifi.processors.standard.QueryDatabaseTableRecord
 org.apache.nifi.processors.standard.QueryRecord
 org.apache.nifi.processors.standard.ReplaceText
 org.apache.nifi.processors.standard.ReplaceTextWithMapping


[4/4] nifi git commit: NIFI-4517: Added ExecuteSQLRecord and QueryDatabaseTableRecord processors

Posted by pv...@apache.org.
NIFI-4517: Added ExecuteSQLRecord and QueryDatabaseTableRecord processors

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2945.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c6572f04
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c6572f04
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c6572f04

Branch: refs/heads/master
Commit: c6572f042bf1637f6faaa2b2ffe4a56e297c6d1a
Parents: b4810b8
Author: Matthew Burgess <ma...@apache.org>
Authored: Fri Aug 10 16:49:25 2018 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Sat Oct 6 10:53:11 2018 +0200

----------------------------------------------------------------------
 .../record/ResultSetRecordSet.java              |   15 +-
 .../processors/standard/AbstractExecuteSQL.java |  369 +++++
 .../standard/AbstractQueryDatabaseTable.java    |  483 +++++++
 .../nifi/processors/standard/ExecuteSQL.java    |  371 +----
 .../processors/standard/ExecuteSQLRecord.java   |  147 ++
 .../processors/standard/QueryDatabaseTable.java |  453 +-----
 .../standard/QueryDatabaseTableRecord.java      |  148 ++
 .../standard/sql/DefaultAvroSqlWriter.java      |   67 +
 .../standard/sql/RecordSqlWriter.java           |  158 +++
 .../nifi/processors/standard/sql/SqlWriter.java |   77 +
 .../org.apache.nifi.processor.Processor         |    2 +
 .../standard/QueryDatabaseTableRecordTest.java  | 1332 ++++++++++++++++++
 .../standard/QueryDatabaseTableTest.java        |    2 +-
 .../processors/standard/TestExecuteSQL.java     |   44 +-
 .../standard/TestExecuteSQLRecord.java          |  376 +++++
 15 files changed, 3261 insertions(+), 783 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 551789c..bf7d224 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -63,6 +63,19 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
         return schema;
     }
 
+    // Protected methods for subclasses to access private member variables
+    protected ResultSet getResultSet() {
+        return rs;
+    }
+
+    protected boolean hasMoreRows() {
+        return moreRows;
+    }
+
+    protected void setMoreRows(boolean moreRows) {
+        this.moreRows = moreRows;
+    }
+
     @Override
     public Record next() throws IOException {
         try {
@@ -87,7 +100,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
         }
     }
 
-    private Record createRecord(final ResultSet rs) throws SQLException {
+    protected Record createRecord(final ResultSet rs) throws SQLException {
         final Map<String, Object> values = new HashMap<>(schema.getFieldCount());
 
         for (final RecordField field : schema.getFields()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
new file mode 100644
index 0000000..bf46549
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.StopWatch;
+
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public abstract class AbstractExecuteSQL extends AbstractProcessor {
+
+    public static final String RESULT_ROW_COUNT = "executesql.row.count";
+    public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
+    public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime";
+    public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
+    public static final String RESULTSET_INDEX = "executesql.resultset.index";
+
+    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+    public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Successfully created FlowFile from SQL query result set.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
+            .build();
+    protected Set<Relationship> relationships;
+
+    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
+            .name("Database Connection Pooling Service")
+            .description("The Controller Service that is used to obtain connection to database")
+            .required(true)
+            .identifiesControllerService(DBCPService.class)
+            .build();
+
+    public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder()
+            .name("SQL select query")
+            .description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes "
+                    + "using Expression Language. If this property is specified, it will be used regardless of the content of "
+                    + "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected "
+                    + "to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression "
+                    + "Language is not evaluated for flow file contents.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Max Wait Time")
+            .description("The maximum amount of time allowed for a running SQL select query "
+                    + " , zero means there is no limit. Max time less than 1 second will be equal to zero.")
+            .defaultValue("0 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
+            .name("esql-max-rows")
+            .displayName("Max Rows Per Flow File")
+            .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
+                    + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("esql-output-batch-size")
+            .displayName("Output Batch Size")
+            .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
+                    + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
+                    + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
+                    + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The fragment.count attribute will not be set on FlowFiles when this "
+                    + "property is set.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected List<PropertyDescriptor> propDescriptors;
+
+    protected DBCPService dbcpService;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propDescriptors;
+    }
+
+    @OnScheduled
+    public void setup(ProcessContext context) {
+        // If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization
+        if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) {
+            final String errorString = "Either the Select Query must be specified or there must be an incoming connection "
+                    + "providing flowfile(s) containing a SQL select query";
+            getLogger().error(errorString);
+            throw new ProcessException(errorString);
+        }
+        dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile fileToProcess = null;
+        if (context.hasIncomingConnection()) {
+            fileToProcess = session.get();
+
+            // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
+            // However, if we have no FlowFile and we have connections coming from other Processors, then
+            // we know that we should run only if we have a FlowFile.
+            if (fileToProcess == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+
+        final ComponentLog logger = getLogger();
+        final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+        final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+        final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
+
+        SqlWriter sqlWriter = configureSqlWriter(session, context, fileToProcess);
+
+        final String selectQuery;
+        if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
+            selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+        } else {
+            // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
+            // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
+            final StringBuilder queryContents = new StringBuilder();
+            session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
+            selectQuery = queryContents.toString();
+        }
+
+        int resultCount = 0;
+        try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
+             final PreparedStatement st = con.prepareStatement(selectQuery)) {
+            st.setQueryTimeout(queryTimeout); // timeout in seconds
+
+            if (fileToProcess != null) {
+                JdbcCommon.setParameters(st, fileToProcess.getAttributes());
+            }
+            logger.debug("Executing query {}", new Object[]{selectQuery});
+
+            int fragmentIndex = 0;
+            final String fragmentId = UUID.randomUUID().toString();
+
+            final StopWatch executionTime = new StopWatch(true);
+
+            boolean hasResults = st.execute();
+
+            long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
+
+            boolean hasUpdateCount = st.getUpdateCount() != -1;
+
+            while (hasResults || hasUpdateCount) {
+                //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
+                if (hasResults) {
+                    final AtomicLong nrOfRows = new AtomicLong(0L);
+
+                    try {
+                        final ResultSet resultSet = st.getResultSet();
+                        do {
+                            final StopWatch fetchTime = new StopWatch(true);
+
+                            FlowFile resultSetFF;
+                            if (fileToProcess == null) {
+                                resultSetFF = session.create();
+                            } else {
+                                resultSetFF = session.create(fileToProcess);
+                                resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
+                            }
+
+                            try {
+                                resultSetFF = session.write(resultSetFF, out -> {
+                                    try {
+                                        nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null));
+                                    } catch (Exception e) {
+                                        throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
+                                    }
+                                });
+
+                                long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
+
+                                // set attributes
+                                final Map<String, String> attributesToAdd = new HashMap<>();
+                                attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+                                attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
+                                attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
+                                attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
+                                attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount));
+                                attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
+                                resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
+                                sqlWriter.updateCounters(session);
+
+                                // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
+                                if (maxRowsPerFlowFile > 0) {
+                                    // if row count is zero and this is not the first fragment, drop it instead of committing it.
+                                    if (nrOfRows.get() == 0 && fragmentIndex > 0) {
+                                        session.remove(resultSetFF);
+                                        break;
+                                    }
+
+                                    resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
+                                    resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
+                                }
+
+                                logger.info("{} contains {} records; transferring to 'success'",
+                                        new Object[]{resultSetFF, nrOfRows.get()});
+                                // Report a FETCH event if there was an incoming flow file, or a RECEIVE event otherwise
+                                if(context.hasIncomingConnection()) {
+                                    session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
+                                } else {
+                                    session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
+                                }
+                                resultSetFlowFiles.add(resultSetFF);
+
+                                // If we've reached the batch size, send out the flow files
+                                if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
+                                    session.transfer(resultSetFlowFiles, REL_SUCCESS);
+                                    session.commit();
+                                    resultSetFlowFiles.clear();
+                                }
+
+                                fragmentIndex++;
+                            } catch (Exception e) {
+                                // Remove the result set flow file and propagate the exception
+                                session.remove(resultSetFF);
+                                if (e instanceof ProcessException) {
+                                    throw (ProcessException) e;
+                                } else {
+                                    throw new ProcessException(e);
+                                }
+                            }
+                        } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
+
+                        // If we are splitting results but not outputting batches, set count on all FlowFiles
+                        if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
+                            for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+                                resultSetFlowFiles.set(i,
+                                        session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
+                            }
+                        }
+                    } catch (final SQLException e) {
+                        throw new ProcessException(e);
+                    }
+
+                    resultCount++;
+                }
+
+                // are there anymore result sets?
+                try {
+                    hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
+                    hasUpdateCount = st.getUpdateCount() != -1;
+                } catch (SQLException ex) {
+                    hasResults = false;
+                    hasUpdateCount = false;
+                }
+            }
+
+            // Transfer any remaining files to SUCCESS
+            session.transfer(resultSetFlowFiles, REL_SUCCESS);
+            resultSetFlowFiles.clear();
+
+            //If we had at least one result then it's OK to drop the original file, but if we had no results then
+            //  pass the original flow file down the line to trigger downstream processors
+            if (fileToProcess != null) {
+                if (resultCount > 0) {
+                    session.remove(fileToProcess);
+                } else {
+                    fileToProcess = session.write(fileToProcess, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
+                    fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
+                    fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
+                    session.transfer(fileToProcess, REL_SUCCESS);
+                }
+            } else if (resultCount == 0) {
+                //If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
+                // Then generate an empty Output FlowFile
+                FlowFile resultSetFF = session.create();
+
+                resultSetFF = session.write(resultSetFF, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
+                resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
+                resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
+                session.transfer(resultSetFF, REL_SUCCESS);
+            }
+        } catch (final ProcessException | SQLException e) {
+            //If we had at least one result then it's OK to drop the original file, but if we had no results then
+            //  pass the original flow file down the line to trigger downstream processors
+            if (fileToProcess == null) {
+                // This can happen if any exceptions occur while setting up the connection, statement, etc.
+                logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure",
+                        new Object[]{selectQuery, e});
+                context.yield();
+            } else {
+                if (context.hasIncomingConnection()) {
+                    logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure",
+                            new Object[]{selectQuery, fileToProcess, e});
+                    fileToProcess = session.penalize(fileToProcess);
+                } else {
+                    logger.error("Unable to execute SQL select query {} due to {}; routing to failure",
+                            new Object[]{selectQuery, e});
+                    context.yield();
+                }
+                session.transfer(fileToProcess, REL_FAILURE);
+            }
+        }
+    }
+
+    protected abstract SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
new file mode 100644
index 0000000..06df6c1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+
+
+public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchProcessor {
+
+    public static final String RESULT_TABLENAME = "tablename";
+    public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
+
+    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+
+    public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Fetch Size")
+            .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
+                    + "honored and/or exact. If the value specified is zero, then the hint is ignored.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
+            .name("qdbt-max-rows")
+            .displayName("Max Rows Per Flow File")
+            .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
+                    + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("qdbt-output-batch-size")
+            .displayName("Output Batch Size")
+            .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
+                    + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
+                    + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
+                    + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this "
+                    + "property is set.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder()
+            .name("qdbt-max-frags")
+            .displayName("Maximum Number of Fragments")
+            .description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " +
+                    "This prevents OutOfMemoryError when this processor ingests huge table. NOTE: Setting this property can result in data loss, as the incoming results are "
+                    + "not ordered, and fragments may end at arbitrary boundaries where rows are not included in the result set.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propDescriptors;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+                .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                .dynamic(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void setup(final ProcessContext context) {
+        maxValueProperties = getDefaultMaxValueProperties(context, null);
+    }
+
+    @OnStopped
+    public void stop() {
+        // Reset the column type map in case properties change
+        setupComplete.set(false);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+        // Fetch the column/table info once
+        if (!setupComplete.get()) {
+            super.setup(context);
+        }
+        ProcessSession session = sessionFactory.createSession();
+        final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+
+        final ComponentLog logger = getLogger();
+
+        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+        final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+        final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
+        final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+        final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
+        final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
+        final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+        final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
+        final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
+                ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
+                : 0;
+
+
+        SqlWriter sqlWriter = configureSqlWriter(session, context);
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
+                    + "query until this is accomplished.", ioe);
+            context.yield();
+            return;
+        }
+        // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
+        // set as the current state map (after the session has been committed)
+        final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
+
+        //If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
+        for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
+            String maxPropKey = maxProp.getKey().toLowerCase();
+            String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey, dbAdapter);
+            if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
+                String newMaxPropValue;
+                // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
+                // the value has been stored under a key that is only the column name. Fall back to check the column name,
+                // but store the new initial max value under the fully-qualified key.
+                if (statePropertyMap.containsKey(maxPropKey)) {
+                    newMaxPropValue = statePropertyMap.get(maxPropKey);
+                } else {
+                    newMaxPropValue = maxProp.getValue();
+                }
+                statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);
+
+            }
+        }
+
+        List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
+                ? null
+                : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
+        final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
+        final StopWatch stopWatch = new StopWatch(true);
+        final String fragmentIdentifier = UUID.randomUUID().toString();
+
+        try (final Connection con = dbcpService.getConnection(Collections.emptyMap());
+             final Statement st = con.createStatement()) {
+
+            if (fetchSize != null && fetchSize > 0) {
+                try {
+                    st.setFetchSize(fetchSize);
+                } catch (SQLException se) {
+                    // Not all drivers support this, just log the error (at debug level) and move on
+                    logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
+                }
+            }
+
+            String jdbcURL = "DBCPService";
+            try {
+                DatabaseMetaData databaseMetaData = con.getMetaData();
+                if (databaseMetaData != null) {
+                    jdbcURL = databaseMetaData.getURL();
+                }
+            } catch (SQLException se) {
+                // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
+            }
+
+            final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
+            st.setQueryTimeout(queryTimeout); // timeout in seconds
+            if (logger.isDebugEnabled()) {
+                logger.debug("Executing query {}", new Object[] { selectQuery });
+            }
+            try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
+                int fragmentIndex=0;
+                // Max values will be updated in the state property map by the callback
+                final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
+
+                while(true) {
+                    final AtomicLong nrOfRows = new AtomicLong(0L);
+
+                    FlowFile fileToProcess = session.create();
+                    try {
+                        fileToProcess = session.write(fileToProcess, out -> {
+                            try {
+                                nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), maxValCollector));
+                            } catch (Exception e) {
+                                throw new ProcessException("Error during database query or conversion of records.", e);
+                            }
+                        });
+                    } catch (ProcessException e) {
+                        // Add flowfile to results before rethrowing so it will be removed from session in outer catch
+                        resultSetFlowFiles.add(fileToProcess);
+                        throw e;
+                    }
+
+                    if (nrOfRows.get() > 0) {
+                        // set attributes
+                        final Map<String, String> attributesToAdd = new HashMap<>();
+                        attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+                        attributesToAdd.put(RESULT_TABLENAME, tableName);
+
+                        if(maxRowsPerFlowFile > 0) {
+                            attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier);
+                            attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(fragmentIndex));
+                        }
+
+                        attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
+                        fileToProcess = session.putAllAttributes(fileToProcess, attributesToAdd);
+                        sqlWriter.updateCounters(session);
+
+                        logger.info("{} contains {} records; transferring to 'success'",
+                                new Object[]{fileToProcess, nrOfRows.get()});
+
+                        session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                        resultSetFlowFiles.add(fileToProcess);
+                        // If we've reached the batch size, send out the flow files
+                        if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
+                            session.transfer(resultSetFlowFiles, REL_SUCCESS);
+                            session.commit();
+                            resultSetFlowFiles.clear();
+                        }
+                    } else {
+                        // If there were no rows returned, don't send the flowfile
+                        session.remove(fileToProcess);
+                        // If no rows and this was first FlowFile, yield
+                        if(fragmentIndex == 0){
+                            context.yield();
+                        }
+                        break;
+                    }
+
+                    fragmentIndex++;
+                    if (maxFragments > 0 && fragmentIndex >= maxFragments) {
+                        break;
+                    }
+
+                    // If we aren't splitting up the data into flow files or fragments, then the result set has been entirely fetched so don't loop back around
+                    if (maxFragments == 0 && maxRowsPerFlowFile == 0) {
+                        break;
+                    }
+
+                    // If we are splitting up the data into flow files, don't loop back around if we've gotten all results
+                    if(maxRowsPerFlowFile > 0 && nrOfRows.get() < maxRowsPerFlowFile) {
+                        break;
+                    }
+                }
+
+                // Apply state changes from the Max Value tracker
+                maxValCollector.applyStateChanges();
+
+                // Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes
+                if (outputBatchSize == 0) {
+                    for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+                        // Add maximum values as attributes
+                        for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
+                            // Get just the column name from the key
+                            String key = entry.getKey();
+                            String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
+                            resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue()));
+                        }
+
+                        //set count on all FlowFiles
+                        if (maxRowsPerFlowFile > 0) {
+                            resultSetFlowFiles.set(i,
+                                    session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
+                        }
+                    }
+                }
+            } catch (final SQLException e) {
+                throw e;
+            }
+
+            session.transfer(resultSetFlowFiles, REL_SUCCESS);
+
+        } catch (final ProcessException | SQLException e) {
+            logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
+            if (!resultSetFlowFiles.isEmpty()) {
+                session.remove(resultSetFlowFiles);
+            }
+            context.yield();
+        } finally {
+            session.commit();
+            try {
+                // Update the state
+                stateManager.setState(statePropertyMap, Scope.CLUSTER);
+            } catch (IOException ioe) {
+                getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", new Object[]{this, ioe});
+            }
+        }
+    }
+
+    protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
+                              String customWhereClause, Map<String, String> stateMap) {
+
+        return getQuery(dbAdapter, tableName, null, columnNames, maxValColumnNames, customWhereClause, stateMap);
+    }
+
+    protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String sqlQuery, String columnNames, List<String> maxValColumnNames,
+                              String customWhereClause, Map<String, String> stateMap) {
+        if (StringUtils.isEmpty(tableName)) {
+            throw new IllegalArgumentException("Table name must be specified");
+        }
+        final StringBuilder query;
+
+        if (StringUtils.isEmpty(sqlQuery)) {
+            query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
+        } else {
+            query = getWrappedQuery(dbAdapter, sqlQuery, tableName);
+        }
+
+        List<String> whereClauses = new ArrayList<>();
+        // Check state map for last max values
+        if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
+            IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
+                String colName = maxValColumnNames.get(index);
+                String maxValueKey = getStateKey(tableName, colName, dbAdapter);
+                String maxValue = stateMap.get(maxValueKey);
+                if (StringUtils.isEmpty(maxValue)) {
+                    // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
+                    // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
+                    // maximum value is observed, it will be stored under the fully-qualified key from then on.
+                    maxValue = stateMap.get(colName.toLowerCase());
+                }
+                if (!StringUtils.isEmpty(maxValue)) {
+                    Integer type = columnTypeMap.get(maxValueKey);
+                    if (type == null) {
+                        // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled.
+                        throw new IllegalArgumentException("No column type found for: " + colName);
+                    }
+                    // Add a condition for the WHERE clause
+                    whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
+                }
+            });
+        }
+
+        if (customWhereClause != null) {
+            whereClauses.add("(" + customWhereClause + ")");
+        }
+
+        if (!whereClauses.isEmpty()) {
+            query.append(" WHERE ");
+            query.append(StringUtils.join(whereClauses, " AND "));
+        }
+
+        return query.toString();
+    }
+
+    public class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
+        DatabaseAdapter dbAdapter;
+        final Map<String, String> newColMap;
+        final Map<String, String> originalState;
+        String tableName;
+
+        public MaxValueResultSetRowCollector(String tableName, Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
+            this.dbAdapter = dbAdapter;
+            this.originalState = stateMap;
+
+            this.newColMap = new HashMap<>();
+            this.newColMap.putAll(stateMap);
+
+            this.tableName = tableName;
+        }
+
+        @Override
+        public void processRow(ResultSet resultSet) throws IOException {
+            if (resultSet == null) {
+                return;
+            }
+            try {
+                // Iterate over the row, check-and-set max values
+                final ResultSetMetaData meta = resultSet.getMetaData();
+                final int nrOfColumns = meta.getColumnCount();
+                if (nrOfColumns > 0) {
+                    for (int i = 1; i <= nrOfColumns; i++) {
+                        String colName = meta.getColumnName(i).toLowerCase();
+                        String fullyQualifiedMaxValueKey = getStateKey(tableName, colName, dbAdapter);
+                        Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey);
+                        // Skip any columns we're not keeping track of or whose value is null
+                        if (type == null || resultSet.getObject(i) == null) {
+                            continue;
+                        }
+                        String maxValueString = newColMap.get(fullyQualifiedMaxValueKey);
+                        // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
+                        // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
+                        // maximum value is observed, it will be stored under the fully-qualified key from then on.
+                        if (StringUtils.isEmpty(maxValueString)) {
+                            maxValueString = newColMap.get(colName);
+                        }
+                        String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName());
+                        if (newMaxValueString != null) {
+                            newColMap.put(fullyQualifiedMaxValueKey, newMaxValueString);
+                        }
+                    }
+                }
+            } catch (ParseException | SQLException e) {
+                throw new IOException(e);
+            }
+        }
+
+        @Override
+        public void applyStateChanges() {
+            this.originalState.putAll(this.newColMap);
+        }
+    }
+
+    protected abstract SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index df82e2e..cc6d508 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -16,22 +16,12 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.nio.charset.Charset;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -41,23 +31,16 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.flowfile.attributes.FragmentAttributes;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.AvroUtil.CodecType;
+import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
 import org.apache.nifi.processors.standard.util.JdbcCommon;
-import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.processors.standard.util.AvroUtil.CodecType;
 
 import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
 import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
@@ -94,99 +77,24 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
                 + "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
 })
 @WritesAttributes({
-    @WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"),
-    @WritesAttribute(attribute="executesql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds"),
-    @WritesAttribute(attribute="executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"),
-    @WritesAttribute(attribute="executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"),
-    @WritesAttribute(attribute="executesql.resultset.index", description = "Assuming multiple result sets are returned, "
-       + "the zero based index of this result set."),
-    @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
-            + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
-    @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of  "
-            + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
-            + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
-            + "attribute will not be populated."),
-    @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
-            + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
-            + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order  "
+        @WritesAttribute(attribute = "executesql.row.count", description = "Contains the number of rows returned in the select query"),
+        @WritesAttribute(attribute = "executesql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds"),
+        @WritesAttribute(attribute = "executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"),
+        @WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"),
+        @WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, "
+                + "the zero based index of this result set."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+                + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of  "
+                + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+                + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
+                + "attribute will not be populated."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order  "
                 + "FlowFiles were produced")
 })
-public class ExecuteSQL extends AbstractProcessor {
-
-    public static final String RESULT_ROW_COUNT = "executesql.row.count";
-    public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
-    public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime";
-    public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
-    public static final String RESULTSET_INDEX = "executesql.resultset.index";
-
-    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
-    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
-    public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
-
-    // Relationships
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Successfully created FlowFile from SQL query result set.")
-            .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
-            .build();
-    private final Set<Relationship> relationships;
-
-    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
-            .name("Database Connection Pooling Service")
-            .description("The Controller Service that is used to obtain connection to database")
-            .required(true)
-            .identifiesControllerService(DBCPService.class)
-            .build();
-
-    public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder()
-            .name("SQL select query")
-            .description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes "
-                    + "using Expression Language. If this property is specified, it will be used regardless of the content of "
-                    + "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected "
-                    + "to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression "
-                    + "Language is not evaluated for flow file contents.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
-
-    public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
-            .name("Max Wait Time")
-            .description("The maximum amount of time allowed for a running SQL select query "
-                    + " , zero means there is no limit. Max time less than 1 second will be equal to zero.")
-            .defaultValue("0 seconds")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .sensitive(false)
-            .build();
-
-    public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
-            .name("esql-max-rows")
-            .displayName("Max Rows Per Flow File")
-            .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
-                    + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
-            .defaultValue("0")
-            .required(true)
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-
-    public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
-            .name("esql-output-batch-size")
-            .displayName("Output Batch Size")
-            .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
-                    + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
-                    + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
-                    + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The fragment.count attribute will not be set on FlowFiles when this "
-                    + "property is set.")
-            .defaultValue("0")
-            .required(true)
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
+public class ExecuteSQL extends AbstractExecuteSQL {
 
     public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
             .name("compression-format")
@@ -198,8 +106,6 @@ public class ExecuteSQL extends AbstractProcessor {
             .required(true)
             .build();
 
-    private final List<PropertyDescriptor> propDescriptors;
-
     public ExecuteSQL() {
         final Set<Relationship> r = new HashSet<>();
         r.add(REL_SUCCESS);
@@ -212,248 +118,31 @@ public class ExecuteSQL extends AbstractProcessor {
         pds.add(QUERY_TIMEOUT);
         pds.add(NORMALIZE_NAMES_FOR_AVRO);
         pds.add(USE_AVRO_LOGICAL_TYPES);
+        pds.add(COMPRESSION_FORMAT);
         pds.add(DEFAULT_PRECISION);
         pds.add(DEFAULT_SCALE);
         pds.add(MAX_ROWS_PER_FLOW_FILE);
         pds.add(OUTPUT_BATCH_SIZE);
-        pds.add(COMPRESSION_FORMAT);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
     @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return propDescriptors;
-    }
-
-    @OnScheduled
-    public void setup(ProcessContext context) {
-        // If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization
-        if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) {
-            final String errorString = "Either the Select Query must be specified or there must be an incoming connection "
-                    + "providing flowfile(s) containing a SQL select query";
-            getLogger().error(errorString);
-            throw new ProcessException(errorString);
-        }
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        FlowFile fileToProcess = null;
-        if (context.hasIncomingConnection()) {
-            fileToProcess = session.get();
-
-            // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
-            // However, if we have no FlowFile and we have connections coming from other Processors, then
-            // we know that we should run only if we have a FlowFile.
-            if (fileToProcess == null && context.hasNonLoopConnection()) {
-                return;
-            }
-        }
-
-        final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
-
-        final ComponentLog logger = getLogger();
-        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+    protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess) {
         final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
         final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
         final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
-        final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-        final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
         final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger();
         final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger();
         final String codec = context.getProperty(COMPRESSION_FORMAT).getValue();
 
-        final String selectQuery;
-        if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
-            selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-        } else {
-            // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
-            // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
-            final StringBuilder queryContents = new StringBuilder();
-            session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
-            selectQuery = queryContents.toString();
-        }
-
-        int resultCount=0;
-        try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
-            final PreparedStatement st = con.prepareStatement(selectQuery)) {
-            st.setQueryTimeout(queryTimeout); // timeout in seconds
-
-            if (fileToProcess != null) {
-                JdbcCommon.setParameters(st, fileToProcess.getAttributes());
-            }
-            logger.debug("Executing query {}", new Object[]{selectQuery});
-
-            int fragmentIndex=0;
-            final String fragmentId = UUID.randomUUID().toString();
-
-            final StopWatch executionTime = new StopWatch(true);
-
-            boolean hasResults = st.execute();
-
-            long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
-
-            boolean hasUpdateCount = st.getUpdateCount() != -1;
-
-            while(hasResults || hasUpdateCount) {
-                //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
-                if (hasResults) {
-                    final AtomicLong nrOfRows = new AtomicLong(0L);
-
-                    try {
-                        final ResultSet resultSet = st.getResultSet();
-                        final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
-                                .convertNames(convertNamesForAvro)
-                                .useLogicalTypes(useAvroLogicalTypes)
-                                .defaultPrecision(defaultPrecision)
-                                .defaultScale(defaultScale)
-                                .maxRows(maxRowsPerFlowFile)
-                                .codecFactory(codec)
-                                .build();
-
-                        do {
-                            final StopWatch fetchTime = new StopWatch(true);
-
-                            FlowFile resultSetFF;
-                            if (fileToProcess == null) {
-                                resultSetFF = session.create();
-                            } else {
-                                resultSetFF = session.create(fileToProcess);
-                                resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
-                            }
-
-                            try {
-                                resultSetFF = session.write(resultSetFF, out -> {
-                                    try {
-                                        nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
-                                    } catch (SQLException e) {
-                                        throw new ProcessException(e);
-                                    }
-                                });
-
-                                long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
-
-                                // set attribute how many rows were selected
-                                resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
-                                resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
-                                resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
-                                resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
-                                resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
-                                resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
-
-                                // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
-                                if (maxRowsPerFlowFile > 0) {
-                                    // if row count is zero and this is not the first fragment, drop it instead of committing it.
-                                    if (nrOfRows.get() == 0 && fragmentIndex > 0) {
-                                        session.remove(resultSetFF);
-                                        break;
-                                    }
-
-                                    resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
-                                    resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
-                                }
-
-                                logger.info("{} contains {} Avro records; transferring to 'success'",
-                                        new Object[]{resultSetFF, nrOfRows.get()});
-                                session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
-                                resultSetFlowFiles.add(resultSetFF);
-
-                                // If we've reached the batch size, send out the flow files
-                                if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
-                                    session.transfer(resultSetFlowFiles, REL_SUCCESS);
-                                    session.commit();
-                                    resultSetFlowFiles.clear();
-                                }
-
-                                fragmentIndex++;
-                            } catch (Exception e) {
-                                // Remove the result set flow file and propagate the exception
-                                session.remove(resultSetFF);
-                                if (e instanceof ProcessException) {
-                                    throw (ProcessException) e;
-                                } else {
-                                    throw new ProcessException(e);
-                                }
-                            }
-                        } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
-
-                        // If we are splitting results but not outputting batches, set count on all FlowFiles
-                        if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
-                            for (int i = 0; i < resultSetFlowFiles.size(); i++) {
-                                resultSetFlowFiles.set(i,
-                                        session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
-                            }
-                        }
-                    } catch (final SQLException e) {
-                        throw new ProcessException(e);
-                    }
-
-                    resultCount++;
-                }
-
-                // are there anymore result sets?
-                try{
-                    hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
-                    hasUpdateCount = st.getUpdateCount() != -1;
-                } catch(SQLException ex){
-                    hasResults = false;
-                    hasUpdateCount = false;
-                }
-            }
-
-            // Transfer any remaining files to SUCCESS
-            session.transfer(resultSetFlowFiles, REL_SUCCESS);
-            resultSetFlowFiles.clear();
-
-            //If we had at least one result then it's OK to drop the original file, but if we had no results then
-            //  pass the original flow file down the line to trigger downstream processors
-            if(fileToProcess != null){
-                if(resultCount > 0){
-                    session.remove(fileToProcess);
-                } else {
-                    fileToProcess = session.write(fileToProcess, JdbcCommon::createEmptyAvroStream);
-
-                    fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
-                    fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
-                    session.transfer(fileToProcess, REL_SUCCESS);
-                }
-            } else if(resultCount == 0){
-                //If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
-                // Then generate an empty Output FlowFile
-                FlowFile resultSetFF = session.create();
-
-                resultSetFF = session.write(resultSetFF, out -> JdbcCommon.createEmptyAvroStream(out));
-
-                resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
-                resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
-                session.transfer(resultSetFF, REL_SUCCESS);
-            }
-        } catch (final ProcessException | SQLException e) {
-            //If we had at least one result then it's OK to drop the original file, but if we had no results then
-            //  pass the original flow file down the line to trigger downstream processors
-            if (fileToProcess == null) {
-                // This can happen if any exceptions occur while setting up the connection, statement, etc.
-                logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure",
-                        new Object[]{selectQuery, e});
-                context.yield();
-            } else {
-                if (context.hasIncomingConnection()) {
-                    logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure",
-                            new Object[]{selectQuery, fileToProcess, e});
-                    fileToProcess = session.penalize(fileToProcess);
-                } else {
-                    logger.error("Unable to execute SQL select query {} due to {}; routing to failure",
-                            new Object[]{selectQuery, e});
-                    context.yield();
-                }
-                session.transfer(fileToProcess, REL_FAILURE);
-            }
-        }
+        final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
+                .convertNames(convertNamesForAvro)
+                .useLogicalTypes(useAvroLogicalTypes)
+                .defaultPrecision(defaultPrecision)
+                .defaultScale(defaultScale)
+                .maxRows(maxRowsPerFlowFile)
+                .codecFactory(codec)
+                .build();
+        return new DefaultAvroSqlWriter(options);
     }
 }