You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2022/01/25 17:12:25 UTC

[nifi] branch main updated: NIFI-9589: Support initial loading from the current max values in QueryDatabaseTable* processors

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 96b53d8  NIFI-9589: Support initial loading from the current max values in QueryDatabaseTable* processors
96b53d8 is described below

commit 96b53d894375e98370d2bc41c8f4b3a9a1f1881d
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Wed Jan 19 04:07:57 2022 +0100

    NIFI-9589: Support initial loading from the current max values in QueryDatabaseTable* processors
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #5676.
---
 .../standard/AbstractQueryDatabaseTable.java       |  69 +++++++++++++-
 .../processors/standard/QueryDatabaseTable.java    |   1 +
 .../standard/QueryDatabaseTableRecord.java         |   1 +
 .../standard/QueryDatabaseTableRecordTest.java     | 102 ++++++++++++++++++++
 .../standard/QueryDatabaseTableTest.java           | 104 +++++++++++++++++++++
 5 files changed, 276 insertions(+), 1 deletion(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
index 2aa39e0..03df629 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -21,6 +21,8 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.dbcp.DBCPService;
@@ -49,6 +51,7 @@ import java.sql.Statement;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -57,6 +60,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 
@@ -141,6 +145,20 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
             .allowableValues(TRANSACTION_NONE,TRANSACTION_READ_COMMITTED, TRANSACTION_READ_UNCOMMITTED, TRANSACTION_REPEATABLE_READ, TRANSACTION_SERIALIZABLE)
             .build();
 
+    public static final AllowableValue INITIAL_LOAD_STRATEGY_ALL_ROWS = new AllowableValue("Start at Beginning", "Start at Beginning", "Loads all existing rows from the database table.");
+    public static final AllowableValue INITIAL_LOAD_STRATEGY_NEW_ROWS = new AllowableValue("Start at Current Maximum Values", "Start at Current Maximum Values", "Loads only the newly " +
+            "inserted or updated rows based on the maximum value(s) of the column(s) configured in the '" + MAX_VALUE_COLUMN_NAMES.getDisplayName() + "' property.");
+
+    public static final PropertyDescriptor INITIAL_LOAD_STRATEGY = new PropertyDescriptor.Builder()
+            .name("initial-load-strategy")
+            .displayName("Initial Load Strategy")
+            .description("How to handle existing rows in the database table when the processor is started for the first time (or its state has been cleared). The property will be ignored, " +
+                    "if any '" + INITIAL_MAX_VALUE_PROP_START + "*' dynamic property has also been configured.")
+            .required(true)
+            .allowableValues(INITIAL_LOAD_STRATEGY_ALL_ROWS, INITIAL_LOAD_STRATEGY_NEW_ROWS)
+            .defaultValue(INITIAL_LOAD_STRATEGY_ALL_ROWS.getValue())
+            .build();
+
     @Override
     public Set<Relationship> getRelationships() {
         return relationships;
@@ -163,6 +181,24 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
                 .build();
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        final boolean maxValueColumnNames = validationContext.getProperty(MAX_VALUE_COLUMN_NAMES).isSet();
+        final String initialLoadStrategy = validationContext.getProperty(INITIAL_LOAD_STRATEGY).getValue();
+        if (!maxValueColumnNames && initialLoadStrategy.equals(INITIAL_LOAD_STRATEGY_NEW_ROWS.getValue())) {
+            results.add(new ValidationResult.Builder().valid(false)
+                    .subject(INITIAL_LOAD_STRATEGY.getDisplayName())
+                    .input(INITIAL_LOAD_STRATEGY_NEW_ROWS.getDisplayName())
+                    .explanation(String.format("'%s' strategy can only be used when '%s' property is also configured",
+                            INITIAL_LOAD_STRATEGY_NEW_ROWS.getDisplayName(), MAX_VALUE_COLUMN_NAMES.getDisplayName()))
+                    .build());
+        }
+
+        return results;
+    }
+
     @OnScheduled
     public void setup(final ProcessContext context) {
         maxValueProperties = getDefaultMaxValueProperties(context, null);
@@ -191,7 +227,9 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
         final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
         final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
         final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+        final String initialLoadStrategy = context.getProperty(INITIAL_LOAD_STRATEGY).getValue();
         final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
+        final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
         final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
         final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
         final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
@@ -241,6 +279,36 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
         List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
                 ? null
                 : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
+
+        if (maxValueColumnNameList != null && statePropertyMap.isEmpty() && initialLoadStrategy.equals(INITIAL_LOAD_STRATEGY_NEW_ROWS.getValue())) {
+            final String columnsClause = maxValueColumnNameList.stream()
+                    .map(columnName -> String.format("MAX(%s) %s", columnName, columnName))
+                    .collect(Collectors.joining(", "));
+
+            final String selectMaxQuery = dbAdapter.getSelectStatement(tableName, columnsClause, null, null, null, null);
+
+            try (final Connection con = dbcpService.getConnection(Collections.emptyMap());
+                 final Statement st = con.createStatement()) {
+
+                if (transIsolationLevel != null) {
+                    con.setTransactionIsolation(transIsolationLevel);
+                }
+
+                st.setQueryTimeout(queryTimeout); // timeout in seconds
+
+                try (final ResultSet resultSet = st.executeQuery(selectMaxQuery)) {
+                    if (resultSet.next()) {
+                        final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
+                        maxValCollector.processRow(resultSet);
+                        maxValCollector.applyStateChanges();
+                    }
+                }
+            } catch (final Exception e) {
+                logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectMaxQuery, e});
+                context.yield();
+            }
+        }
+
         final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
         final StopWatch stopWatch = new StopWatch(true);
         final String fragmentIdentifier = UUID.randomUUID().toString();
@@ -271,7 +339,6 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
                 // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
             }
 
-            final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
             st.setQueryTimeout(queryTimeout); // timeout in seconds
             if (logger.isDebugEnabled()) {
                 logger.debug("Executing query {}", new Object[] { selectQuery });
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index d650386..1af871a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -103,6 +103,7 @@ public class QueryDatabaseTable extends AbstractQueryDatabaseTable {
         pds.add(WHERE_CLAUSE);
         pds.add(SQL_QUERY);
         pds.add(MAX_VALUE_COLUMN_NAMES);
+        pds.add(INITIAL_LOAD_STRATEGY);
         pds.add(QUERY_TIMEOUT);
         pds.add(FETCH_SIZE);
         pds.add(MAX_ROWS_PER_FLOW_FILE);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
index 8a2fd93..4f5a3e5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
@@ -125,6 +125,7 @@ public class QueryDatabaseTableRecord extends AbstractQueryDatabaseTable {
         pds.add(SQL_QUERY);
         pds.add(RECORD_WRITER_FACTORY);
         pds.add(MAX_VALUE_COLUMN_NAMES);
+        pds.add(INITIAL_LOAD_STRATEGY);
         pds.add(QUERY_TIMEOUT);
         pds.add(FETCH_SIZE);
         pds.add(MAX_ROWS_PER_FLOW_FILE);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
index 3f8a489..f46dcc5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
@@ -949,6 +949,108 @@ public class QueryDatabaseTableRecordTest {
     }
 
     @Test
+    public void testInitialLoadStrategyStartAtBeginning() throws SQLException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        cal.setTimeInMillis(0);
+
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+        dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        int rowCount = 0;
+        //create larger row set
+        for (int batch = 0; batch < 10; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+            rowCount++;
+            cal.add(Calendar.MINUTE, 1);
+        }
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+        runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "created_on");
+        runner.setProperty(QueryDatabaseTableRecord.INITIAL_LOAD_STRATEGY, QueryDatabaseTableRecord.INITIAL_LOAD_STRATEGY_ALL_ROWS.getValue());
+
+        // Initial run with no previous state. Should get all 10 records
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "10");
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        // Validate Max Value doesn't change also
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testInitialLoadStrategyStartAtCurrentMaximumValues() throws SQLException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        cal.setTimeInMillis(0);
+
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+        dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        int rowCount = 0;
+        //create larger row set
+        for (int batch = 0; batch < 10; batch++) {
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+            rowCount++;
+            cal.add(Calendar.MINUTE, 1);
+        }
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+        runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "created_on");
+        runner.setProperty(QueryDatabaseTableRecord.INITIAL_LOAD_STRATEGY, QueryDatabaseTableRecord.INITIAL_LOAD_STRATEGY_NEW_ROWS.getValue());
+
+        // Initial run with no previous state. Should not get any records but store Max Value in the state
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        // Validate Max Value doesn't change also
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+    }
+
+    @Test
     public void testAddedRowsCustomWhereClause() throws SQLException, IOException {
 
         // load test data to database
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 8b51fe2..d7eec01 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -970,6 +970,110 @@ public class QueryDatabaseTableTest {
     }
 
     @Test
+    public void testInitialLoadStrategyStartAtBeginning() throws SQLException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        InputStream in;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        cal.setTimeInMillis(0);
+
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+        dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        int rowCount=0;
+        //create larger row set
+        for(int batch=0;batch<10;batch++){
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+            rowCount++;
+            cal.add(Calendar.MINUTE, 1);
+        }
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+        runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on");
+        runner.setProperty(QueryDatabaseTable.INITIAL_LOAD_STRATEGY, QueryDatabaseTable.INITIAL_LOAD_STRATEGY_ALL_ROWS.getValue());
+
+        // Initial run with no previous state. Should get all 10 records
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(10, getNumberOfRecordsFromStream(in));
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        // Validate Max Value doesn't change also
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testInitialLoadStrategyStartAtCurrentMaximumValues() throws SQLException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        InputStream in;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        cal.setTimeInMillis(0);
+
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+        dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        int rowCount=0;
+        //create larger row set
+        for(int batch=0;batch<10;batch++){
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+            rowCount++;
+            cal.add(Calendar.MINUTE, 1);
+        }
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+        runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on");
+        runner.setProperty(QueryDatabaseTable.INITIAL_LOAD_STRATEGY, QueryDatabaseTable.INITIAL_LOAD_STRATEGY_NEW_ROWS.getValue());
+
+        // Initial run with no previous state. Should not get any records but store Max Value in the state
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        // Validate Max Value doesn't change also
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+    }
+
+    @Test
     public void testAddedRowsCustomWhereClause() throws ClassNotFoundException, SQLException, InitializationException, IOException {
 
         // load test data to database