You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/09/06 20:53:11 UTC

nifi git commit: NIFI-2712: Fixed Fetch processors for multiple max-value columns. This closes #976

Repository: nifi
Updated Branches:
  refs/heads/master a7e76cc00 -> 2afc739ab


NIFI-2712: Fixed Fetch processors for multiple max-value columns. This closes #976


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2afc739a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2afc739a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2afc739a

Branch: refs/heads/master
Commit: 2afc739ab72fa2bce8f64c8833f3eae1399966ce
Parents: a7e76cc
Author: Matt Burgess <ma...@apache.org>
Authored: Wed Aug 31 09:00:16 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Tue Sep 6 16:52:40 2016 -0400

----------------------------------------------------------------------
 .../AbstractDatabaseFetchProcessor.java         |  6 +-
 .../processors/standard/GenerateTableFetch.java |  6 +-
 .../processors/standard/QueryDatabaseTable.java | 10 +--
 .../standard/QueryDatabaseTableTest.java        | 68 +++++++++++++++++++-
 .../standard/TestGenerateTableFetch.java        | 51 +++++++++++++++
 5 files changed, 130 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2afc739a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index 7b30479..962e74e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -111,8 +111,10 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
     public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new PropertyDescriptor.Builder()
             .name("Maximum-value Columns")
             .description("A comma-separated list of column names. The processor will keep track of the maximum value "
-                    + "for each column that has been returned since the processor started running. This can be used to "
-                    + "retrieve only those rows that have been added/updated since the last retrieval. Note that some "
+                    + "for each column that has been returned since the processor started running. Using multiple columns implies an order "
+                    + "to the column list, and each column's values are expected to increase more slowly than the previous columns' values. Thus, "
+                    + "using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. This processor "
+                    + "can be used to retrieve only those rows that have been added/updated since the last retrieval. Note that some "
                     + "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these "
                     + "types should not be listed in this property, and will result in error(s) during processing. If no columns "
                     + "are provided, all rows from the table will be considered, which could have a performance impact.")

http://git-wip-us.apache.org/repos/asf/nifi/blob/2afc739a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 23a2e3c..f1c86f8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -56,6 +56,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
 
 
 @TriggerSerially
@@ -160,7 +161,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
             maxValueSelectColumns.add("COUNT(*)");
 
             // For each maximum-value column, get a WHERE filter and a MAX(column) alias
-            maxValueColumnNameList.forEach(colName -> {
+            IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
+                String colName = maxValueColumnNameList.get(index);
                 maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
                 String maxValue = statePropertyMap.get(colName.toLowerCase());
                 if (!StringUtils.isEmpty(maxValue)) {
@@ -170,7 +172,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
                         throw new IllegalArgumentException("No column type found for: " + colName);
                     }
                     // Add a condition for the WHERE clause
-                    maxValueClauses.add(colName + " > " + getLiteralByType(type, maxValue, dbAdapter.getName()));
+                    maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
                 }
             });
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/2afc739a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index cfe68b5..ed57854 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -64,6 +64,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
 
 
 @EventDriven
@@ -325,9 +326,10 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
         final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
 
         // Check state map for last max values
-        if (stateMap != null  && !stateMap.isEmpty() && maxValColumnNames != null) {
+        if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
             List<String> whereClauses = new ArrayList<>(maxValColumnNames.size());
-            for (String colName : maxValColumnNames) {
+            IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
+                String colName = maxValColumnNames.get(index);
                 String maxValue = stateMap.get(colName.toLowerCase());
                 if (!StringUtils.isEmpty(maxValue)) {
                     Integer type = columnTypeMap.get(colName.toLowerCase());
@@ -336,9 +338,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
                         throw new IllegalArgumentException("No column type found for: " + colName);
                     }
                     // Add a condition for the WHERE clause
-                    whereClauses.add(colName + " > " + getLiteralByType(type, maxValue, dbAdapter.getName()));
+                    whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
                 }
-            }
+            });
             if (!whereClauses.isEmpty()) {
                 query.append(" WHERE ");
                 query.append(StringUtils.join(whereClauses, " AND "));

http://git-wip-us.apache.org/repos/asf/nifi/blob/2afc739a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 979dd38..974a835 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -121,7 +121,8 @@ public class QueryDatabaseTableTest {
     }
 
     @After
-    public void teardown() {
+    public void teardown() throws IOException {
+        runner.getStateManager().clear(Scope.CLUSTER);
         runner = null;
         QueryDatabaseTable.dbAdapters.clear();
         QueryDatabaseTable.dbAdapters.putAll(origDbAdapters);
@@ -149,12 +150,12 @@ public class QueryDatabaseTableTest {
         stateManager.setState(maxValues, Scope.CLUSTER);
         processor.putColumnType("date_created", Types.TIMESTAMP);
         query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap());
-        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > '2016-03-07 12:34:56'", query);
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query);
 
         // Test Oracle strategy
         dbAdapter = new OracleDatabaseAdapter();
         query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap());
-        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query);
+        assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query);
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -295,6 +296,67 @@ public class QueryDatabaseTableTest {
     }
 
     @Test
+    public void testMultiplePartitions() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID, BUCKET");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        assertEquals("2",
+                runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTable.RESULT_ROW_COUNT)
+        );
+        runner.clearTransferState();
+
+        // Add a new row in the same bucket
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        assertEquals("1",
+                runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTable.RESULT_ROW_COUNT)
+        );
+        runner.clearTransferState();
+
+        // Add a new row in a new bucket
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (3, 1)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        assertEquals("1",
+                runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTable.RESULT_ROW_COUNT)
+        );
+        runner.clearTransferState();
+
+        // Add a new row in an old bucket, it should not be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (4, 0)");
+        runner.run();
+        runner.assertTransferCount(QueryDatabaseTable.REL_SUCCESS, 0);
+
+        // Add a new row in the second bucket, only the new row should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (5, 1)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
+        assertEquals("1",
+                runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).getAttribute(QueryDatabaseTable.RESULT_ROW_COUNT)
+        );
+        runner.clearTransferState();
+    }
+
+    @Test
     public void testTimestampNanos() throws ClassNotFoundException, SQLException, InitializationException, IOException {
 
         // load test data to database

http://git-wip-us.apache.org/repos/asf/nifi/blob/2afc739a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 2f2fbd1..3b9f3a3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -200,6 +200,57 @@ public class TestGenerateTableFetch {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testMultiplePartitions() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID, BUCKET");
+        // Set partition size to 1 so we can compare flow files to records
+        runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "1");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        // Add a new row in the same bucket
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        // Add a new row in a new bucket
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (3, 1)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        // Add a new row in an old bucket, it should not be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (4, 0)");
+        runner.run();
+        runner.assertTransferCount(GenerateTableFetch.REL_SUCCESS, 0);
+
+        // Add a new row in the second bucket, only the new row should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (5, 1)");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1);
+        runner.clearTransferState();
+    }
+
 
     /**
      * Simple implementation only for ListDatabaseTables processor testing.