You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2022/01/25 17:12:25 UTC
[nifi] branch main updated: NIFI-9589: Support initial loading from the current max values in QueryDatabaseTable* processors
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 96b53d8 NIFI-9589: Support initial loading from the current max values in QueryDatabaseTable* processors
96b53d8 is described below
commit 96b53d894375e98370d2bc41c8f4b3a9a1f1881d
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Wed Jan 19 04:07:57 2022 +0100
NIFI-9589: Support initial loading from the current max values in QueryDatabaseTable* processors
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #5676.
---
.../standard/AbstractQueryDatabaseTable.java | 69 +++++++++++++-
.../processors/standard/QueryDatabaseTable.java | 1 +
.../standard/QueryDatabaseTableRecord.java | 1 +
.../standard/QueryDatabaseTableRecordTest.java | 102 ++++++++++++++++++++
.../standard/QueryDatabaseTableTest.java | 104 +++++++++++++++++++++
5 files changed, 276 insertions(+), 1 deletion(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
index 2aa39e0..03df629 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -21,6 +21,8 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
@@ -49,6 +51,7 @@ import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -57,6 +60,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -141,6 +145,20 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
.allowableValues(TRANSACTION_NONE,TRANSACTION_READ_COMMITTED, TRANSACTION_READ_UNCOMMITTED, TRANSACTION_REPEATABLE_READ, TRANSACTION_SERIALIZABLE)
.build();
+ public static final AllowableValue INITIAL_LOAD_STRATEGY_ALL_ROWS = new AllowableValue("Start at Beginning", "Start at Beginning", "Loads all existing rows from the database table.");
+ public static final AllowableValue INITIAL_LOAD_STRATEGY_NEW_ROWS = new AllowableValue("Start at Current Maximum Values", "Start at Current Maximum Values", "Loads only the newly " +
+ "inserted or updated rows based on the maximum value(s) of the column(s) configured in the '" + MAX_VALUE_COLUMN_NAMES.getDisplayName() + "' property.");
+
+ public static final PropertyDescriptor INITIAL_LOAD_STRATEGY = new PropertyDescriptor.Builder()
+ .name("initial-load-strategy")
+ .displayName("Initial Load Strategy")
+ .description("How to handle existing rows in the database table when the processor is started for the first time (or its state has been cleared). The property will be ignored, " +
+ "if any '" + INITIAL_MAX_VALUE_PROP_START + "*' dynamic property has also been configured.")
+ .required(true)
+ .allowableValues(INITIAL_LOAD_STRATEGY_ALL_ROWS, INITIAL_LOAD_STRATEGY_NEW_ROWS)
+ .defaultValue(INITIAL_LOAD_STRATEGY_ALL_ROWS.getValue())
+ .build();
+
@Override
public Set<Relationship> getRelationships() {
return relationships;
@@ -163,6 +181,24 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
.build();
}
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+ final boolean maxValueColumnNames = validationContext.getProperty(MAX_VALUE_COLUMN_NAMES).isSet();
+ final String initialLoadStrategy = validationContext.getProperty(INITIAL_LOAD_STRATEGY).getValue();
+ if (!maxValueColumnNames && initialLoadStrategy.equals(INITIAL_LOAD_STRATEGY_NEW_ROWS.getValue())) {
+ results.add(new ValidationResult.Builder().valid(false)
+ .subject(INITIAL_LOAD_STRATEGY.getDisplayName())
+ .input(INITIAL_LOAD_STRATEGY_NEW_ROWS.getDisplayName())
+ .explanation(String.format("'%s' strategy can only be used when '%s' property is also configured",
+ INITIAL_LOAD_STRATEGY_NEW_ROWS.getDisplayName(), MAX_VALUE_COLUMN_NAMES.getDisplayName()))
+ .build());
+ }
+
+ return results;
+ }
+
@OnScheduled
public void setup(final ProcessContext context) {
maxValueProperties = getDefaultMaxValueProperties(context, null);
@@ -191,7 +227,9 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+ final String initialLoadStrategy = context.getProperty(INITIAL_LOAD_STRATEGY).getValue();
final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
+ final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
@@ -241,6 +279,36 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? null
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
+
+ if (maxValueColumnNameList != null && statePropertyMap.isEmpty() && initialLoadStrategy.equals(INITIAL_LOAD_STRATEGY_NEW_ROWS.getValue())) {
+ final String columnsClause = maxValueColumnNameList.stream()
+ .map(columnName -> String.format("MAX(%s) %s", columnName, columnName))
+ .collect(Collectors.joining(", "));
+
+ final String selectMaxQuery = dbAdapter.getSelectStatement(tableName, columnsClause, null, null, null, null);
+
+ try (final Connection con = dbcpService.getConnection(Collections.emptyMap());
+ final Statement st = con.createStatement()) {
+
+ if (transIsolationLevel != null) {
+ con.setTransactionIsolation(transIsolationLevel);
+ }
+
+ st.setQueryTimeout(queryTimeout); // timeout in seconds
+
+ try (final ResultSet resultSet = st.executeQuery(selectMaxQuery)) {
+ if (resultSet.next()) {
+ final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
+ maxValCollector.processRow(resultSet);
+ maxValCollector.applyStateChanges();
+ }
+ }
+ } catch (final Exception e) {
+ logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectMaxQuery, e});
+ context.yield();
+ }
+ }
+
final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
final StopWatch stopWatch = new StopWatch(true);
final String fragmentIdentifier = UUID.randomUUID().toString();
@@ -271,7 +339,6 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
// Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
}
- final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
if (logger.isDebugEnabled()) {
logger.debug("Executing query {}", new Object[] { selectQuery });
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index d650386..1af871a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -103,6 +103,7 @@ public class QueryDatabaseTable extends AbstractQueryDatabaseTable {
pds.add(WHERE_CLAUSE);
pds.add(SQL_QUERY);
pds.add(MAX_VALUE_COLUMN_NAMES);
+ pds.add(INITIAL_LOAD_STRATEGY);
pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
index 8a2fd93..4f5a3e5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
@@ -125,6 +125,7 @@ public class QueryDatabaseTableRecord extends AbstractQueryDatabaseTable {
pds.add(SQL_QUERY);
pds.add(RECORD_WRITER_FACTORY);
pds.add(MAX_VALUE_COLUMN_NAMES);
+ pds.add(INITIAL_LOAD_STRATEGY);
pds.add(QUERY_TIMEOUT);
pds.add(FETCH_SIZE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
index 3f8a489..f46dcc5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
@@ -949,6 +949,108 @@ public class QueryDatabaseTableRecordTest {
}
@Test
+ public void testInitialLoadStrategyStartAtBeginning() throws SQLException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ cal.setTimeInMillis(0);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ int rowCount = 0;
+ //create larger row set
+ for (int batch = 0; batch < 10; batch++) {
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+ rowCount++;
+ cal.add(Calendar.MINUTE, 1);
+ }
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+ runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "created_on");
+ runner.setProperty(QueryDatabaseTableRecord.INITIAL_LOAD_STRATEGY, QueryDatabaseTableRecord.INITIAL_LOAD_STRATEGY_ALL_ROWS.getValue());
+
+ // Initial run with no previous state. Should get all 10 records
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "10");
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ // Validate Max Value doesn't change also
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testInitialLoadStrategyStartAtCurrentMaximumValues() throws SQLException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ cal.setTimeInMillis(0);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ int rowCount = 0;
+ //create larger row set
+ for (int batch = 0; batch < 10; batch++) {
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+ rowCount++;
+ cal.add(Calendar.MINUTE, 1);
+ }
+
+ runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+ runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, "created_on");
+ runner.setProperty(QueryDatabaseTableRecord.INITIAL_LOAD_STRATEGY, QueryDatabaseTableRecord.INITIAL_LOAD_STRATEGY_NEW_ROWS.getValue());
+
+ // Initial run with no previous state. Should not get any records but store Max Value in the state
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ // Validate Max Value doesn't change also
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+ }
+
+ @Test
public void testAddedRowsCustomWhereClause() throws SQLException, IOException {
// load test data to database
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 8b51fe2..d7eec01 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -970,6 +970,110 @@ public class QueryDatabaseTableTest {
}
@Test
+ public void testInitialLoadStrategyStartAtBeginning() throws SQLException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+ InputStream in;
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ cal.setTimeInMillis(0);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ int rowCount=0;
+ //create larger row set
+ for(int batch=0;batch<10;batch++){
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+ rowCount++;
+ cal.add(Calendar.MINUTE, 1);
+ }
+
+ runner.setProperty(QueryDatabaseTable.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+ runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on");
+ runner.setProperty(QueryDatabaseTable.INITIAL_LOAD_STRATEGY, QueryDatabaseTable.INITIAL_LOAD_STRATEGY_ALL_ROWS.getValue());
+
+ // Initial run with no previous state. Should get all 10 records
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+ in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+ assertEquals(10, getNumberOfRecordsFromStream(in));
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ // Validate Max Value doesn't change also
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testInitialLoadStrategyStartAtCurrentMaximumValues() throws SQLException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+ InputStream in;
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ cal.setTimeInMillis(0);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ int rowCount=0;
+ //create larger row set
+ for(int batch=0;batch<10;batch++){
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+ rowCount++;
+ cal.add(Calendar.MINUTE, 1);
+ }
+
+ runner.setProperty(QueryDatabaseTable.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+ runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on");
+ runner.setProperty(QueryDatabaseTable.INITIAL_LOAD_STRATEGY, QueryDatabaseTable.INITIAL_LOAD_STRATEGY_NEW_ROWS.getValue());
+
+ // Initial run with no previous state. Should not get any records but store Max Value in the state
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ // Validate Max Value doesn't change also
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+ }
+
+ @Test
public void testAddedRowsCustomWhereClause() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database