You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/09/07 15:13:36 UTC
nifi git commit: NIFI-3484: GenerateTableFetch Should Allow for Right
Boundary
Repository: nifi
Updated Branches:
refs/heads/master 527ce0b4e -> ae30c7f35
NIFI-3484: GenerateTableFetch Should Allow for Right Boundary
fix checkstyle issue, and added unit test showing data duplication issue, removed property
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2091
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ae30c7f3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ae30c7f3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ae30c7f3
Branch: refs/heads/master
Commit: ae30c7f35013e1faf26c6bd3af122362fa4b361e
Parents: 527ce0b
Author: patricker <pa...@gmail.com>
Authored: Wed Feb 15 12:11:56 2017 -0700
Committer: Matthew Burgess <ma...@apache.org>
Committed: Thu Sep 7 11:05:38 2017 -0400
----------------------------------------------------------------------
.../processors/standard/GenerateTableFetch.java | 62 +++-
.../standard/TestGenerateTableFetch.java | 283 +++++++++++++++++--
2 files changed, 309 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/ae30c7f3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index f5407da..3db2782 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -237,23 +237,12 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
// For each maximum-value column, get a WHERE filter and a MAX(column) alias
IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
String colName = maxValueColumnNameList.get(index);
+
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
- final String fullyQualifiedStateKey = getStateKey(tableName, colName);
- String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
- if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
- // If the table name is static and the fully-qualified key was not found, try just the column name
- maxValue = statePropertyMap.get(getStateKey(null, colName));
- }
+ String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
if (!StringUtils.isEmpty(maxValue)) {
- Integer type = columnTypeMap.get(fullyQualifiedStateKey);
- if (type == null && !isDynamicTableName) {
- // If the table name is static and the fully-qualified key was not found, try just the column name
- type = columnTypeMap.get(getStateKey(null, colName));
- }
- if (type == null) {
- // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
- throw new IllegalArgumentException("No column type found for: " + colName);
- }
+ Integer type = getColumnType(tableName, colName);
+
// Add a condition for the WHERE clause
maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
}
@@ -318,6 +307,23 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
throw new SQLException("No rows returned from metadata query: " + selectQuery);
}
+ // for each maximum-value column get a right bounding WHERE condition
+ IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
+ String colName = maxValueColumnNameList.get(index);
+
+ maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
+ String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
+ if (!StringUtils.isEmpty(maxValue)) {
+ Integer type = getColumnType(tableName, colName);
+
+ // Add a condition for the WHERE clause
+ maxValueClauses.add(colName + " <= " + getLiteralByType(type, maxValue, dbAdapter.getName()));
+ }
+ });
+
+ //Update WHERE list to include new right hand boundaries
+ whereClause = StringUtils.join(maxValueClauses, " AND ");
+
final long numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
// Generate SQL statements to read "pages" of data
@@ -377,4 +383,30 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
context.yield();
}
}
+
+ private String getColumnStateMaxValue(String tableName, Map<String, String> statePropertyMap, String colName) {
+ final String fullyQualifiedStateKey = getStateKey(tableName, colName);
+ String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
+ if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
+ // If the table name is static and the fully-qualified key was not found, try just the column name
+ maxValue = statePropertyMap.get(getStateKey(null, colName));
+ }
+
+ return maxValue;
+ }
+
+ private Integer getColumnType(String tableName, String colName) {
+ final String fullyQualifiedStateKey = getStateKey(tableName, colName);
+ Integer type = columnTypeMap.get(fullyQualifiedStateKey);
+ if (type == null && !isDynamicTableName) {
+ // If the table name is static and the fully-qualified key was not found, try just the column name
+ type = columnTypeMap.get(getStateKey(null, colName));
+ }
+ if (type == null) {
+ // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
+ throw new IllegalArgumentException("No column type found for: " + colName);
+ }
+
+ return type;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ae30c7f3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 76bc1f0..8d549fd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -139,7 +139,7 @@ public class TestGenerateTableFetch {
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray());
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
ResultSet resultSet = stmt.executeQuery(query);
// Should be three records
assertTrue(resultSet.next());
@@ -164,7 +164,7 @@ public class TestGenerateTableFetch {
// Verify first flow file's contents
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
query = new String(flowFile.toByteArray());
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID FETCH NEXT 2 ROWS ONLY", query);
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 5 ORDER BY ID FETCH NEXT 2 ROWS ONLY", query);
resultSet = stmt.executeQuery(query);
// Should be two records
assertTrue(resultSet.next());
@@ -174,7 +174,7 @@ public class TestGenerateTableFetch {
// Verify second flow file's contents
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
query = new String(flowFile.toByteArray());
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 5 ORDER BY ID OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
resultSet = stmt.executeQuery(query);
// Should be one record
assertTrue(resultSet.next());
@@ -187,7 +187,7 @@ public class TestGenerateTableFetch {
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
query = new String(flowFile.toByteArray());
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 5 ORDER BY ID FETCH NEXT 2 ROWS ONLY", query);
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 5 AND ID <= 6 ORDER BY ID FETCH NEXT 2 ROWS ONLY", query);
resultSet = stmt.executeQuery(query);
// Should be one record
assertTrue(resultSet.next());
@@ -201,16 +201,16 @@ public class TestGenerateTableFetch {
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 4); // 7 records with partition size 2 means 4 generated FlowFiles
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
- assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY name FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+ assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE name <= 'Mr. NiFi' ORDER BY name FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
- assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+ assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE name <= 'Mr. NiFi' ORDER BY name OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(2);
- assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 4 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+ assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE name <= 'Mr. NiFi' ORDER BY name OFFSET 4 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(3);
- assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 6 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+ assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE name <= 'Mr. NiFi' ORDER BY name OFFSET 6 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName"));
assertEquals("id, name, scale, created_on", flowFile.getAttribute("generatetablefetch.columnNames"));
- assertEquals(null, flowFile.getAttribute("generatetablefetch.whereClause"));
+ assertEquals("name <= 'Mr. NiFi'", flowFile.getAttribute("generatetablefetch.whereClause"));
assertEquals("name", flowFile.getAttribute("generatetablefetch.maxColumnNames"));
assertEquals("2", flowFile.getAttribute("generatetablefetch.limit"));
assertEquals("6", flowFile.getAttribute("generatetablefetch.offset"));
@@ -219,6 +219,190 @@ public class TestGenerateTableFetch {
}
@Test
+ public void testAddedRowsRightBounded() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ String query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+ ResultSet resultSet = stmt.executeQuery(query);
+ // Should be three records
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add 3 new rows with a higher ID and run with a partition size of 2. Two flow files should be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (5, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+ runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+ // Verify first flow file's contents
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 5 ORDER BY ID FETCH NEXT 2 ROWS ONLY", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be two records
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+
+ // Verify second flow file's contents
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 5 ORDER BY ID OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be one record
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Add a new row with a higher ID and run, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (6, 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 5 AND ID <= 6 ORDER BY ID FETCH NEXT 2 ROWS ONLY", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be one record
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "name");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 4); // 7 records with partition size 2 means 4 generated FlowFiles
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE name <= 'Mr. NiFi' ORDER BY name FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE name <= 'Mr. NiFi' ORDER BY name OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(2);
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE name <= 'Mr. NiFi' ORDER BY name OFFSET 4 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(3);
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE name <= 'Mr. NiFi' ORDER BY name OFFSET 6 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray()));
+
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testAddedRowsTimestampRightBounded() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "created_on");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ String query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE created_on <= '2010-01-01 00:00:00.0' ORDER BY created_on FETCH NEXT 10000 ROWS ONLY", query);
+ ResultSet resultSet = stmt.executeQuery(query);
+ // Should be three records
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add 5 new rows, 3 with higher timestamps, 2 with a lower timestamp.
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (4, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (5, 'Marty Johnson', 15.0, '2011-01-01 02:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (6, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (7, 'James Johnson', 16.0, '2011-01-01 04:23:34.236')");
+ runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+ // Verify first flow file's contents
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE created_on > '2010-01-01 00:00:00.0' AND "
+ + "created_on <= '2011-01-01 04:23:34.236' ORDER BY created_on FETCH NEXT 2 ROWS ONLY", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be two records
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+
+ // Verify second flow file's contents
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE created_on > '2010-01-01 00:00:00.0' AND "
+ + "created_on <= '2011-01-01 04:23:34.236' ORDER BY created_on OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be one record
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Add a new row with a higher created_on and run, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (8, 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE created_on > '2011-01-01 04:23:34.236' AND created_on <= '2012-01-01 03:23:34.234' ORDER BY created_on FETCH NEXT 2 ROWS ONLY", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be one record
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+ }
+
+ @Test
public void testOnePartition() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
@@ -244,7 +428,7 @@ public class TestGenerateTableFetch {
runner.run();
runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1);
- runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0).assertContentEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY ID");
+ runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0).assertContentEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID");
runner.clearTransferState();
}
@@ -410,7 +594,7 @@ public class TestGenerateTableFetch {
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 0 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 0 AND id <= 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
}
@Test
@@ -451,10 +635,10 @@ public class TestGenerateTableFetch {
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
// Note there is no WHERE clause here. Because we are using dynamic tables, the old state key/value is not retrieved
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id <= 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName"));
assertEquals(null, flowFile.getAttribute("generatetablefetch.columnNames"));
- assertEquals(null, flowFile.getAttribute("generatetablefetch.whereClause"));
+ assertEquals("id <= 1", flowFile.getAttribute("generatetablefetch.whereClause"));
assertEquals("id", flowFile.getAttribute("generatetablefetch.maxColumnNames"));
assertEquals("10000", flowFile.getAttribute("generatetablefetch.limit"));
assertEquals("0", flowFile.getAttribute("generatetablefetch.offset"));
@@ -470,10 +654,10 @@ public class TestGenerateTableFetch {
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 AND id <= 2 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName"));
assertEquals(null, flowFile.getAttribute("generatetablefetch.columnNames"));
- assertEquals("id > 1", flowFile.getAttribute("generatetablefetch.whereClause"));
+ assertEquals("id > 1 AND id <= 2", flowFile.getAttribute("generatetablefetch.whereClause"));
assertEquals("id", flowFile.getAttribute("generatetablefetch.maxColumnNames"));
assertEquals("10000", flowFile.getAttribute("generatetablefetch.limit"));
assertEquals("0", flowFile.getAttribute("generatetablefetch.offset"));
@@ -516,7 +700,7 @@ public class TestGenerateTableFetch {
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
// Note there is no WHERE clause here. Because we are using dynamic tables, the old state key/value is not retrieved
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id <= 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
runner.clearTransferState();
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
@@ -529,7 +713,7 @@ public class TestGenerateTableFetch {
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 AND id <= 2 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
}
@Test
@@ -570,7 +754,7 @@ public class TestGenerateTableFetch {
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
// Note there is no WHERE clause here. Because we are using dynamic tables (i.e. Expression Language,
// even when not referring to flow file attributes), the old state key/value is not retrieved
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id <= 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
}
@Test
@@ -641,7 +825,7 @@ public class TestGenerateTableFetch {
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray());
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
ResultSet resultSet = stmt.executeQuery(query);
// Should be one record (the initial max value skips the first two)
assertTrue(resultSet.next());
@@ -665,7 +849,7 @@ public class TestGenerateTableFetch {
// Verify first flow file's contents
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
query = new String(flowFile.toByteArray());
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID FETCH NEXT 2 ROWS ONLY", query);
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 5 ORDER BY ID FETCH NEXT 2 ROWS ONLY", query);
resultSet = stmt.executeQuery(query);
// Should be two records
assertTrue(resultSet.next());
@@ -675,7 +859,7 @@ public class TestGenerateTableFetch {
// Verify second flow file's contents
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
query = new String(flowFile.toByteArray());
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 5 ORDER BY ID OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
resultSet = stmt.executeQuery(query);
// Should be one record
assertTrue(resultSet.next());
@@ -683,6 +867,63 @@ public class TestGenerateTableFetch {
runner.clearTransferState();
}
+ @Test
+ public void testNoDuplicateWithRightBounded() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ String query = new String(flowFile.toByteArray());
+
+ // we now insert a row before the query issued by GFT is actually executed by, let's say, ExecuteSQL processor
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (5, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+
+ ResultSet resultSet = stmt.executeQuery(query);
+ int numberRecordsFirstExecution = 0; // Should be three records
+ while(resultSet.next()) {
+ numberRecordsFirstExecution++;
+ }
+ runner.clearTransferState();
+
+ // Run again
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ query = new String(flowFile.toByteArray());
+
+ resultSet = stmt.executeQuery(query);
+ int numberRecordsSecondExecution = 0; // Should be three records
+ while(resultSet.next()) {
+ numberRecordsSecondExecution++;
+ }
+
+ // will fail and will be equal to 9 if right-bounded parameter is set to false.
+ assertEquals(numberRecordsFirstExecution + numberRecordsSecondExecution, 6);
+
+ runner.clearTransferState();
+ }
+
/**
* Simple implementation only for GenerateTableFetch processor testing.