You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/09/07 18:47:43 UTC
nifi git commit: NIFI-4257 - add custom WHERE clause in database
fetch processors
Repository: nifi
Updated Branches:
refs/heads/master ae30c7f35 -> c10ff574c
NIFI-4257 - add custom WHERE clause in database fetch processors
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2050
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c10ff574
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c10ff574
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c10ff574
Branch: refs/heads/master
Commit: c10ff574c4602fe05f5d1dae5eb0b1bd24026c02
Parents: ae30c7f
Author: Pierre Villard <pi...@gmail.com>
Authored: Wed Aug 2 15:22:31 2017 +0200
Committer: Matthew Burgess <ma...@apache.org>
Committed: Thu Sep 7 14:41:26 2017 -0400
----------------------------------------------------------------------
.../AbstractDatabaseFetchProcessor.java | 9 ++
.../processors/standard/GenerateTableFetch.java | 7 +
.../processors/standard/QueryDatabaseTable.java | 21 ++-
.../standard/QueryDatabaseTableTest.java | 153 +++++++++++++++++--
.../standard/TestGenerateTableFetch.java | 115 ++++++++++++++
5 files changed, 288 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/c10ff574/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index ebe23d0..1f26976 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -146,6 +146,15 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
.expressionLanguageSupported(true)
.build();
+ public static final PropertyDescriptor WHERE_CLAUSE = new PropertyDescriptor.Builder()
+ .name("db-fetch-where-clause")
+ .displayName("Additional WHERE clause")
+ .description("A custom clause to be added in the WHERE condition when generating SQL requests.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
protected List<PropertyDescriptor> propDescriptors;
// The delimiter to use when referencing qualified names (such as table@!@column in the state map)
http://git-wip-us.apache.org/repos/asf/nifi/blob/c10ff574/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 3db2782..12278a3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -132,6 +132,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
pds.add(MAX_VALUE_COLUMN_NAMES);
pds.add(QUERY_TIMEOUT);
pds.add(PARTITION_SIZE);
+ pds.add(WHERE_CLAUSE);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -185,6 +186,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
+ final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
@@ -248,6 +250,11 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
}
});
+ if(customWhereClause != null) {
+ // adding the custom WHERE clause (if defined) to the list of existing clauses.
+ maxValueClauses.add("(" + customWhereClause + ")");
+ }
+
whereClause = StringUtils.join(maxValueClauses, " AND ");
columnsClause = StringUtils.join(maxValueSelectColumns, ", ");
http://git-wip-us.apache.org/repos/asf/nifi/blob/c10ff574/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 8ae157a..0532b79 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -161,6 +161,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
pds.add(USE_AVRO_LOGICAL_TYPES);
pds.add(DEFAULT_PRECISION);
pds.add(DEFAULT_SCALE);
+ pds.add(WHERE_CLAUSE);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -192,6 +193,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+ final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
@@ -243,7 +245,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? null
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
- final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, statePropertyMap);
+ final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
final StopWatch stopWatch = new StopWatch(true);
final String fragmentIdentifier = UUID.randomUUID().toString();
@@ -363,15 +365,15 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
}
protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
- Map<String, String> stateMap) {
+ String customWhereClause, Map<String, String> stateMap) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name must be specified");
}
final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
+ List<String> whereClauses = new ArrayList<>();
// Check state map for last max values
if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
- List<String> whereClauses = new ArrayList<>(maxValColumnNames.size());
IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
String colName = maxValColumnNames.get(index);
String maxValueKey = getStateKey(tableName, colName);
@@ -392,10 +394,15 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
}
});
- if (!whereClauses.isEmpty()) {
- query.append(" WHERE ");
- query.append(StringUtils.join(whereClauses, " AND "));
- }
+ }
+
+ if (customWhereClause != null) {
+ whereClauses.add("(" + customWhereClause + ")");
+ }
+
+ if (!whereClauses.isEmpty()) {
+ query.append(" WHERE ");
+ query.append(StringUtils.join(whereClauses, " AND "));
}
return query.toString();
http://git-wip-us.apache.org/repos/asf/nifi/blob/c10ff574/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index b015371..8a2319b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -45,13 +45,13 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.text.SimpleDateFormat;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.SQLNonTransientConnectionException;
import java.sql.Statement;
import java.sql.Types;
+import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
@@ -132,12 +132,12 @@ public class QueryDatabaseTableTest {
@Test
public void testGetQuery() throws Exception {
- String query = processor.getQuery(dbAdapter, "myTable", null, null, null);
+ String query = processor.getQuery(dbAdapter, "myTable", null, null, null, null);
assertEquals("SELECT * FROM myTable", query);
- query = processor.getQuery(dbAdapter, "myTable", "col1,col2", null, null);
+ query = processor.getQuery(dbAdapter, "myTable", "col1,col2", null, null, null);
assertEquals("SELECT col1,col2 FROM myTable", query);
- query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null);
+ query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null, null);
assertEquals("SELECT * FROM myTable", query);
Map<String, String> maxValues = new HashMap<>();
@@ -145,24 +145,24 @@ public class QueryDatabaseTableTest {
StateManager stateManager = runner.getStateManager();
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER);
- query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER).toMap());
+ query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null, stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509", query);
maxValues.put("date_created", "2016-03-07 12:34:56");
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", Types.TIMESTAMP);
- query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap());
+ query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), null, stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query);
// Test Oracle strategy
dbAdapter = new OracleDatabaseAdapter();
- query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56'", query);
+ query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
+ assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND (type = \"CUSTOMER\")", query);
}
@Test(expected = IllegalArgumentException.class)
public void testGetQueryNoTable() throws Exception {
- processor.getQuery(dbAdapter, null, null, null, null);
+ processor.getQuery(dbAdapter, null, null, null, null, null);
}
@Test
@@ -587,7 +587,7 @@ public class QueryDatabaseTableTest {
runner.clearTransferState();
// Run again with a cleaned state. Should get all rows split into batches
- int ffCount = (int) Math.ceil((double)rowCount / 9D);
+ int ffCount = (int) Math.ceil(rowCount / 9D);
runner.getStateManager().clear(Scope.CLUSTER);
runner.run();
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, ffCount);
@@ -717,6 +717,139 @@ public class QueryDatabaseTableTest {
runner.clearTransferState();
}
+ @Test
+ public void testAddedRowsCustomWhereClause() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setProperty(QueryDatabaseTable.WHERE_CLAUSE, "type = 'male'");
+ runner.setIncomingConnection(false);
+ runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"2");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+ assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "0");
+ InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
+ runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
+ assertEquals(1, getNumberOfRecordsFromStream(in));
+
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ //Remove Max Rows Per Flow File
+ runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"0");
+
+ // Add a new row with a higher ID and run, one flowfile with one new row should be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (3, 'female', 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Sanity check - run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add timestamp as a max value column name
+ runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "id, created_on");
+
+ // Add a new row with a higher ID and run, one flow file will be transferred because no max value for the timestamp has been stored
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+ assertEquals(flowFile.getAttribute("maxvalue.id"), "4");
+ assertEquals(flowFile.getAttribute("maxvalue.created_on"), "2011-01-01 03:23:34.234");
+ in = new ByteArrayInputStream(flowFile.toByteArray());
+ assertEquals(1, getNumberOfRecordsFromStream(in));
+ runner.clearTransferState();
+
+ // Add a new row with a higher ID but lower timestamp and run, no flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (5, 'male', 'NO NAME', 15.0, '2001-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add a new row with a higher ID and run, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+ in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+ assertEquals(1, getNumberOfRecordsFromStream(in));
+ runner.clearTransferState();
+
+ // Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "name");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+ in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+ assertEquals(4, getNumberOfRecordsFromStream(in));
+ runner.clearTransferState();
+
+ // Add a new row with a "higher" name than the max but lower than "NULL" (to test that null values are skipped), one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (7, 'male', 'NULK', 1.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+ in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+ assertEquals(1, getNumberOfRecordsFromStream(in));
+ runner.clearTransferState();
+
+ // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "scale");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+ in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+ assertEquals(5, getNumberOfRecordsFromStream(in));
+ runner.clearTransferState();
+
+ // Add a new row with a higher value for scale than the max, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (8, 'male', 'NULK', 100.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+ in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+ assertEquals(1, getNumberOfRecordsFromStream(in));
+ runner.clearTransferState();
+
+ // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "bignum");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+ in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+ assertEquals(6, getNumberOfRecordsFromStream(in));
+ runner.clearTransferState();
+
+ // Add a new row with a higher value for scale than the max, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on, bignum) VALUES (9, 'female', 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 1)");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
+ runner.clearTransferState();
+ }
+
private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/c10ff574/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 8d549fd..d8791a5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -924,6 +924,121 @@ public class TestGenerateTableFetch {
runner.clearTransferState();
}
+ @Test
+ public void testAddedRowsWithCustomWhereClause() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, type varchar(20), name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (0, 'male', 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (1, 'female', 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (2, NULL, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setProperty(GenerateTableFetch.WHERE_CLAUSE, "type = 'male' OR type IS NULL");
+ runner.setIncomingConnection(false);
+ runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ String query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE (type = 'male' OR type IS NULL)"
+ + " AND ID <= 2 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
+ ResultSet resultSet = stmt.executeQuery(query);
+ // Should be two records
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add 3 new rows with a higher ID and run with a partition size of 2. Two flow files should be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (3, 'female', 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (4, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (5, 'male', 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+ runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "1");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+ // Verify first flow file's contents
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND (type = 'male' OR type IS NULL)"
+ + " AND ID <= 5 ORDER BY ID FETCH NEXT 1 ROWS ONLY", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be one record
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+
+ // Verify second flow file's contents
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND (type = 'male' OR type IS NULL)"
+ + " AND ID <= 5 ORDER BY ID OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be one record
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Add a new row with a higher ID and run, one flow file will be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, type, name, scale, created_on) VALUES (6, 'male', 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 5 AND (type = 'male' OR type IS NULL)"
+ + " AND ID <= 6 ORDER BY ID FETCH NEXT 1 ROWS ONLY", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be one record
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "name");
+ runner.setProperty(GenerateTableFetch.COLUMN_NAMES, "id, type, name, scale, created_on");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 5); // 5 records with partition size 1 means 5 generated FlowFiles
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE (type = 'male' OR type IS NULL)"
+ + " AND name <= 'Mr. NiFi' ORDER BY name FETCH NEXT 1 ROWS ONLY", new String(flowFile.toByteArray()));
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+ assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE (type = 'male' OR type IS NULL)"
+ + " AND name <= 'Mr. NiFi' ORDER BY name OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY", new String(flowFile.toByteArray()));
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(2);
+ assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE (type = 'male' OR type IS NULL)"
+ + " AND name <= 'Mr. NiFi' ORDER BY name OFFSET 2 ROWS FETCH NEXT 1 ROWS ONLY", new String(flowFile.toByteArray()));
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(3);
+ assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE (type = 'male' OR type IS NULL)"
+ + " AND name <= 'Mr. NiFi' ORDER BY name OFFSET 3 ROWS FETCH NEXT 1 ROWS ONLY", new String(flowFile.toByteArray()));
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(4);
+ assertEquals("SELECT id, type, name, scale, created_on FROM TEST_QUERY_DB_TABLE WHERE (type = 'male' OR type IS NULL)"
+ + " AND name <= 'Mr. NiFi' ORDER BY name OFFSET 4 ROWS FETCH NEXT 1 ROWS ONLY", new String(flowFile.toByteArray()));
+
+ assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName"));
+ assertEquals("id, type, name, scale, created_on", flowFile.getAttribute("generatetablefetch.columnNames"));
+ assertEquals("(type = 'male' OR type IS NULL) AND name <= 'Mr. NiFi'", flowFile.getAttribute("generatetablefetch.whereClause"));
+ assertEquals("name", flowFile.getAttribute("generatetablefetch.maxColumnNames"));
+ assertEquals("1", flowFile.getAttribute("generatetablefetch.limit"));
+ assertEquals("4", flowFile.getAttribute("generatetablefetch.offset"));
+
+ runner.clearTransferState();
+ }
/**
* Simple implementation only for GenerateTableFetch processor testing.