You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/07/12 01:31:16 UTC
[08/10] nifi git commit: NIFI-5349: Fixed EL handling in Initial Max
Value props for DB Fetch processors
NIFI-5349: Fixed EL handling in Initial Max Value props for DB Fetch processors
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #2822.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f4b2aae4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f4b2aae4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f4b2aae4
Branch: refs/heads/support/nifi-1.7.x
Commit: f4b2aae48ae32d2d8c7dbe89d342a749bdc32575
Parents: 7b28b91
Author: Matthew Burgess <ma...@apache.org>
Authored: Thu Jun 28 15:02:58 2018 -0400
Committer: Andy LoPresto <al...@apache.org>
Committed: Wed Jul 11 18:29:59 2018 -0700
----------------------------------------------------------------------
.../AbstractDatabaseFetchProcessor.java | 30 +---
.../processors/standard/GenerateTableFetch.java | 24 ++-
.../processors/standard/QueryDatabaseTable.java | 20 ++-
.../standard/QueryDatabaseTableTest.java | 56 +++++++
.../standard/TestGenerateTableFetch.java | 161 +++++++++++++++++++
5 files changed, 261 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4b2aae4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index 924c7da..e13f9de 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -21,7 +21,6 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
@@ -220,18 +219,6 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
.build();
}
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .required(false)
- .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
- .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .dynamic(true)
- .build();
- }
-
// A common validation procedure for DB fetch processors, it stores whether the Table Name and/or Max Value Column properties have expression language
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
// For backwards-compatibility, keep track of whether the table name and max-value column properties are dynamic (i.e. has expression language)
@@ -540,19 +527,16 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
return sb.toString();
}
- protected Map<String,String> getDefaultMaxValueProperties(final Map<PropertyDescriptor, String> properties){
- final Map<String,String> defaultMaxValues = new HashMap<>();
+ protected Map<String, String> getDefaultMaxValueProperties(final ProcessContext context, final FlowFile flowFile) {
+ final Map<String, String> defaultMaxValues = new HashMap<>();
- for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
- final String key = entry.getKey().getName();
+ context.getProperties().forEach((k, v) -> {
+ final String key = k.getName();
- if(!key.startsWith(INITIAL_MAX_VALUE_PROP_START)) {
- continue;
+ if (key.startsWith(INITIAL_MAX_VALUE_PROP_START)) {
+ defaultMaxValues.put(key.substring(INITIAL_MAX_VALUE_PROP_START.length()), context.getProperty(k).evaluateAttributeExpressions(flowFile).getValue());
}
-
- defaultMaxValues.put(key.substring(INITIAL_MAX_VALUE_PROP_START.length()), entry.getValue());
- }
-
+ });
return defaultMaxValues;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4b2aae4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 9842d6c..dd001a6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -37,6 +37,7 @@ import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -98,9 +99,11 @@ import java.util.stream.IntStream;
@WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
@WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
})
-@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language",
- expressionLanguageScope = ExpressionLanguageScope.NONE, description = "Specifies an initial "
- + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
+@DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column",
+ expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "Specifies an initial "
+ + "max value for max value columns. Properties should be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time "
+ + "the table is accessed (when a Maximum Value Column is specified). In the case of incoming connections, the value is only used the first time for each table "
+ + "specified in the flow files.")
public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
@@ -165,6 +168,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
}
@Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
final PropertyValue columnForPartitioning = validationContext.getProperty(COLUMN_FOR_VALUE_PARTITIONING);
@@ -180,7 +195,6 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
@Override
@OnScheduled
public void setup(final ProcessContext context) {
- maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
if (context.hasIncomingConnection() && !context.hasNonLoopConnection()) {
getLogger().error("The failure relationship can be used only if there is another incoming connection to this processor.");
}
@@ -209,6 +223,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
return;
}
}
+ maxValueProperties = getDefaultMaxValueProperties(context, fileToProcess);
+
final ComponentLog logger = getLogger();
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4b2aae4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 1dfe64c..b245f57 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -34,6 +34,7 @@ import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -107,8 +108,9 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
+ "FlowFiles were produced"),
@WritesAttribute(attribute = "maxvalue.*", description = "Each attribute contains the observed maximum value of a specified 'Maximum-value Column'. The "
+ "suffix of the attribute is the name of the column. If Output Batch Size is set, then this attribute will not be populated.")})
-@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.NONE,
- description = "Specifies an initial max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
+@DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column",
+ expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Specifies an initial max value for max value column(s). Properties should "
+ + "be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).")
public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public static final String RESULT_TABLENAME = "tablename";
@@ -200,9 +202,21 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
return propDescriptors;
}
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dynamic(true)
+ .build();
+ }
+
@OnScheduled
public void setup(final ProcessContext context) {
- maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
+ maxValueProperties = getDefaultMaxValueProperties(context, null);
}
@OnStopped
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4b2aae4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 87bf2a2..a27c529 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -897,6 +897,62 @@ public class QueryDatabaseTableTest {
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
runner.clearTransferState();
+ }
+
+ @Test
+ public void testInitialMaxValueWithEL() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+ InputStream in;
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ cal.setTimeInMillis(0);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ int rowCount=0;
+ //create larger row set
+ for(int batch=0;batch<10;batch++){
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+ rowCount++;
+ cal.add(Calendar.MINUTE, 1);
+ }
+
+ runner.setProperty(QueryDatabaseTable.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+ runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on");
+
+ cal.setTimeInMillis(0);
+ cal.add(Calendar.MINUTE, 5);
+ runner.setProperty("initial.maxvalue.CREATED_ON", "${created.on}");
+ runner.setVariable("created.on", dateFormat.format(cal.getTime().getTime()));
+ // Initial run with no previous state. Should get only last 4 records
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+ in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+ assertEquals(4, getNumberOfRecordsFromStream(in));
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ // Validate Max Value doesn't change also
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+ runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+ runner.clearTransferState();
// Append a new row, expect 1 flowfile one row
cal.setTimeInMillis(0);
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4b2aae4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index f6c27fa..44dcadf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -964,6 +964,167 @@ public class TestGenerateTableFetch {
}
@Test
+ public void testInitialMaxValueWithEL() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty("initial.maxvalue.ID", "${maxval.id}");
+ runner.setVariable("maxval.id", "1");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ String query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+ ResultSet resultSet = stmt.executeQuery(query);
+ // Should be one record (the initial max value skips the first two)
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testInitialMaxValueWithELAndIncoming() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty("initial.maxvalue.ID", "${maxval.id}");
+ Map<String,String> attrs = new HashMap<String,String>() {{
+ put("maxval.id", "1");
+ }};
+ runner.setIncomingConnection(true);
+ runner.enqueue(new byte[0], attrs);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ String query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+ ResultSet resultSet = stmt.executeQuery(query);
+ // Should be one record (the initial max value skips the first two)
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.enqueue(new byte[0], attrs);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testInitialMaxValueWithELAndMultipleTables() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(GenerateTableFetch.TABLE_NAME, "${table.name}");
+ runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty("initial.maxvalue.ID", "${maxval.id}");
+ Map<String,String> attrs = new HashMap<String,String>() {{
+ put("maxval.id", "1");
+ put("table.name", "TEST_QUERY_DB_TABLE");
+ }};
+ runner.setIncomingConnection(true);
+ runner.enqueue(new byte[0], attrs);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ String query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+ ResultSet resultSet = stmt.executeQuery(query);
+ // Should be one record (the initial max value skips the first two)
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.enqueue(new byte[0], attrs);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Initial Max Value for second table
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE2");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE2 (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ attrs.put("table.name", "TEST_QUERY_DB_TABLE2");
+ runner.enqueue(new byte[0], attrs);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE2 WHERE ID > 1 AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be one record (the initial max value skips the first two)
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.enqueue(new byte[0], attrs);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+ runner.clearTransferState();
+ }
+
+ @Test
public void testNoDuplicateWithRightBounded() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database