You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/07/12 01:31:16 UTC

[08/10] nifi git commit: NIFI-5349: Fixed EL handling in Initial Max Value props for DB Fetch processors

NIFI-5349: Fixed EL handling in Initial Max Value props for DB Fetch processors

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2822.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f4b2aae4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f4b2aae4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f4b2aae4

Branch: refs/heads/support/nifi-1.7.x
Commit: f4b2aae48ae32d2d8c7dbe89d342a749bdc32575
Parents: 7b28b91
Author: Matthew Burgess <ma...@apache.org>
Authored: Thu Jun 28 15:02:58 2018 -0400
Committer: Andy LoPresto <al...@apache.org>
Committed: Wed Jul 11 18:29:59 2018 -0700

----------------------------------------------------------------------
 .../AbstractDatabaseFetchProcessor.java         |  30 +---
 .../processors/standard/GenerateTableFetch.java |  24 ++-
 .../processors/standard/QueryDatabaseTable.java |  20 ++-
 .../standard/QueryDatabaseTableTest.java        |  56 +++++++
 .../standard/TestGenerateTableFetch.java        | 161 +++++++++++++++++++
 5 files changed, 261 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f4b2aae4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index 924c7da..e13f9de 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -21,7 +21,6 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
@@ -220,18 +219,6 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
                 .build();
     }
 
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .name(propertyDescriptorName)
-                .required(false)
-                .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
-                .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
-                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-                .dynamic(true)
-                .build();
-    }
-
     // A common validation procedure for DB fetch processors, it stores whether the Table Name and/or Max Value Column properties have expression language
     protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
         // For backwards-compatibility, keep track of whether the table name and max-value column properties are dynamic (i.e. has expression language)
@@ -540,19 +527,16 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
         return sb.toString();
     }
 
-    protected Map<String,String> getDefaultMaxValueProperties(final Map<PropertyDescriptor, String> properties){
-        final Map<String,String> defaultMaxValues = new HashMap<>();
+    protected Map<String, String> getDefaultMaxValueProperties(final ProcessContext context, final FlowFile flowFile) {
+        final Map<String, String> defaultMaxValues = new HashMap<>();
 
-        for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
-            final String key = entry.getKey().getName();
+        context.getProperties().forEach((k, v) -> {
+            final String key = k.getName();
 
-            if(!key.startsWith(INITIAL_MAX_VALUE_PROP_START)) {
-                continue;
+            if (key.startsWith(INITIAL_MAX_VALUE_PROP_START)) {
+                defaultMaxValues.put(key.substring(INITIAL_MAX_VALUE_PROP_START.length()), context.getProperty(k).evaluateAttributeExpressions(flowFile).getValue());
             }
-
-            defaultMaxValues.put(key.substring(INITIAL_MAX_VALUE_PROP_START.length()), entry.getValue());
-        }
-
+        });
         return defaultMaxValues;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4b2aae4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 9842d6c..dd001a6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -37,6 +37,7 @@ import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -98,9 +99,11 @@ import java.util.stream.IntStream;
         @WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
         @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
 })
-@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language",
-        expressionLanguageScope = ExpressionLanguageScope.NONE, description = "Specifies an initial "
-        + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
+@DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "Specifies an initial "
+        + "max value for max value columns. Properties should be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time "
+        + "the table is accessed (when a Maximum Value Column is specified). In the case of incoming connections, the value is only used the first time for each table "
+        + "specified in the flow files.")
 public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
 
     public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
@@ -165,6 +168,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
     }
 
     @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+                .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    @Override
     protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
         List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
         final PropertyValue columnForPartitioning = validationContext.getProperty(COLUMN_FOR_VALUE_PARTITIONING);
@@ -180,7 +195,6 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
     @Override
     @OnScheduled
     public void setup(final ProcessContext context) {
-        maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
         if (context.hasIncomingConnection() && !context.hasNonLoopConnection()) {
             getLogger().error("The failure relationship can be used only if there is another incoming connection to this processor.");
         }
@@ -209,6 +223,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
                 return;
             }
         }
+        maxValueProperties = getDefaultMaxValueProperties(context, fileToProcess);
+
 
         final ComponentLog logger = getLogger();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4b2aae4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 1dfe64c..b245f57 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -34,6 +34,7 @@ import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -107,8 +108,9 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
                 + "FlowFiles were produced"),
         @WritesAttribute(attribute = "maxvalue.*", description = "Each attribute contains the observed maximum value of a specified 'Maximum-value Column'. The "
                 + "suffix of the attribute is the name of the column. If Output Batch Size is set, then this attribute will not be populated.")})
-@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.NONE,
-                description = "Specifies an initial max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
+@DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column",
+        expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Specifies an initial max value for max value column(s). Properties should "
+        + "be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).")
 public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
 
     public static final String RESULT_TABLENAME = "tablename";
@@ -200,9 +202,21 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
         return propDescriptors;
     }
 
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+                .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                .dynamic(true)
+                .build();
+    }
+
     @OnScheduled
     public void setup(final ProcessContext context) {
-        maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
+        maxValueProperties = getDefaultMaxValueProperties(context, null);
     }
 
     @OnStopped

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4b2aae4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 87bf2a2..a27c529 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -897,6 +897,62 @@ public class QueryDatabaseTableTest {
         runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
         runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
         runner.clearTransferState();
+    }
+
+    @Test
+    public void testInitialMaxValueWithEL() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+        InputStream in;
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        cal.setTimeInMillis(0);
+
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+        dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        int rowCount=0;
+        //create larger row set
+        for(int batch=0;batch<10;batch++){
+            stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '" + dateFormat.format(cal.getTime().getTime()) + "')");
+
+            rowCount++;
+            cal.add(Calendar.MINUTE, 1);
+        }
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
+        runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on");
+
+        cal.setTimeInMillis(0);
+        cal.add(Calendar.MINUTE, 5);
+        runner.setProperty("initial.maxvalue.CREATED_ON", "${created.on}");
+        runner.setVariable("created.on", dateFormat.format(cal.getTime().getTime()));
+        // Initial run with no previous state. Should get only last 4 records
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(4, getNumberOfRecordsFromStream(in));
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        // Validate Max Value doesn't change also
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+        runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
+        runner.clearTransferState();
 
         // Append a new row, expect 1 flowfile one row
         cal.setTimeInMillis(0);

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4b2aae4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index f6c27fa..44dcadf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -964,6 +964,167 @@ public class TestGenerateTableFetch {
     }
 
     @Test
+    public void testInitialMaxValueWithEL() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty("initial.maxvalue.ID", "${maxval.id}");
+        runner.setVariable("maxval.id", "1");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        String query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+        ResultSet resultSet = stmt.executeQuery(query);
+        // Should be one record (the initial max value skips the first two)
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testInitialMaxValueWithELAndIncoming() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty("initial.maxvalue.ID", "${maxval.id}");
+        Map<String,String> attrs = new HashMap<String,String>() {{
+            put("maxval.id", "1");
+        }};
+        runner.setIncomingConnection(true);
+        runner.enqueue(new byte[0], attrs);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        String query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+        ResultSet resultSet = stmt.executeQuery(query);
+        // Should be one record (the initial max value skips the first two)
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.enqueue(new byte[0], attrs);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testInitialMaxValueWithELAndMultipleTables() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "${table.name}");
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty("initial.maxvalue.ID", "${maxval.id}");
+        Map<String,String> attrs = new HashMap<String,String>() {{
+            put("maxval.id", "1");
+            put("table.name", "TEST_QUERY_DB_TABLE");
+        }};
+        runner.setIncomingConnection(true);
+        runner.enqueue(new byte[0], attrs);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        String query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+        ResultSet resultSet = stmt.executeQuery(query);
+        // Should be one record (the initial max value skips the first two)
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.enqueue(new byte[0], attrs);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Initial Max Value for second table
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE2");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE2 (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        attrs.put("table.name", "TEST_QUERY_DB_TABLE2");
+        runner.enqueue(new byte[0], attrs);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE2 WHERE ID > 1 AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be one record (the initial max value skips the first two)
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.enqueue(new byte[0], attrs);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+        runner.clearTransferState();
+    }
+
+    @Test
     public void testNoDuplicateWithRightBounded() throws ClassNotFoundException, SQLException, InitializationException, IOException {
 
         // load test data to database