You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/09/22 18:34:49 UTC

nifi git commit: NIFI-4395 - GenerateTableFetch can't fetch column type by state after instance reboot

Repository: nifi
Updated Branches:
  refs/heads/master 9e2c7be7d -> a29348f2a


NIFI-4395 - GenerateTableFetch can't fetch column type by state after instance reboot

NIFI-4395: Updated unit test for GenerateTableFetch
Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2166


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a29348f2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a29348f2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a29348f2

Branch: refs/heads/master
Commit: a29348f2a4d8ec6576f1ec73c57911b827d46315
Parents: 9e2c7be
Author: Deon Huang <yj...@gmail.com>
Authored: Thu Sep 21 15:34:00 2017 +0800
Committer: Matthew Burgess <ma...@apache.org>
Committed: Fri Sep 22 14:34:00 2017 -0400

----------------------------------------------------------------------
 .../AbstractDatabaseFetchProcessor.java         | 13 +++--
 .../processors/standard/GenerateTableFetch.java | 25 ++++++---
 .../standard/TestGenerateTableFetch.java        | 55 ++++++++++++++++++++
 3 files changed, 82 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a29348f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index 1f26976..fa2a86e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
@@ -222,7 +223,11 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
     }
 
     public void setup(final ProcessContext context) {
-        final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+        setup(context,true,null);
+    }
+
+    public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFile flowFile) {
+        final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue();
 
         // If there are no max-value column names specified, we don't need to perform this processing
         if (StringUtils.isEmpty(maxValueColumnNames)) {
@@ -231,7 +236,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
 
         // Try to fill the columnTypeMap with the types of the desired max-value columns
         final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
 
         final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
         try (final Connection con = dbcpService.getConnection();
@@ -245,7 +250,9 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
             ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
             int numCols = resultSetMetaData.getColumnCount();
             if (numCols > 0) {
-                columnTypeMap.clear();
+                if (shouldCleanCache){
+                    columnTypeMap.clear();
+                }
                 for (int i = 1; i <= numCols; i++) {
                     String colName = resultSetMetaData.getColumnName(i).toLowerCase();
                     String colKey = getStateKey(tableName, colName);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a29348f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 12278a3..1bae371 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -86,12 +86,12 @@ import java.util.stream.IntStream;
         + "per the State Management documentation")
 @WritesAttributes({
         @WritesAttribute(attribute = "generatetablefetch.sql.error", description = "If the processor has incoming connections, and processing an incoming flow file causes "
-        + "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."),
+                + "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."),
         @WritesAttribute(attribute = "generatetablefetch.tableName", description = "The name of the database table to be queried."),
         @WritesAttribute(attribute = "generatetablefetch.columnNames", description = "The comma-separated list of column names used in the query."),
         @WritesAttribute(attribute = "generatetablefetch.whereClause", description = "Where clause used in the query to get the expected rows."),
         @WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data "
-                    + "that has been returned since the processor started running."),
+                + "that has been returned since the processor started running."),
         @WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
         @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
 })
@@ -155,11 +155,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
     @OnScheduled
     public void setup(final ProcessContext context) {
         maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
-        // Pre-fetch the column types if using a static table name and max-value columns
         if (!isDynamicTableName && !isDynamicMaxValues) {
             super.setup(context);
         }
-        if(context.hasIncomingConnection() && !context.hasNonLoopConnection()) {
+        if (context.hasIncomingConnection() && !context.hasNonLoopConnection()) {
             getLogger().error("The failure relationship can be used only if there is another incoming connection to this processor.");
         }
     }
@@ -190,6 +189,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
 
         final StateManager stateManager = context.getStateManager();
         final StateMap stateMap;
+        FlowFile finalFileToProcess = fileToProcess;
+
 
         try {
             stateMap = stateManager.getState(Scope.CLUSTER);
@@ -243,6 +244,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
                 maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
                 String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
                 if (!StringUtils.isEmpty(maxValue)) {
+                    if(columnTypeMap.isEmpty()){
+                        // This means column type cache is clean after instance reboot. We should re-cache column type
+                        super.setup(context, false, finalFileToProcess);
+                    }
                     Integer type = getColumnType(tableName, colName);
 
                     // Add a condition for the WHERE clause
@@ -250,7 +255,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
                 }
             });
 
-            if(customWhereClause != null) {
+            if (customWhereClause != null) {
                 // adding the custom WHERE clause (if defined) to the list of existing clauses.
                 maxValueClauses.add("(" + customWhereClause + ")");
             }
@@ -263,7 +268,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
             long rowCount = 0;
 
             try (final Connection con = dbcpService.getConnection();
-                final Statement st = con.createStatement()) {
+                 final Statement st = con.createStatement()) {
 
                 final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
                 st.setQueryTimeout(queryTimeout); // timeout in seconds
@@ -283,7 +288,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
                         //Some JDBC drivers consider the columns name and label to be very different things.
                         // Since this column has been aliased lets check the label first,
                         // if there is no label we'll use the column name.
-                        String resultColumnName = (StringUtils.isNotEmpty(rsmd.getColumnLabel(i))?rsmd.getColumnLabel(i):rsmd.getColumnName(i)).toLowerCase();
+                        String resultColumnName = (StringUtils.isNotEmpty(rsmd.getColumnLabel(i)) ? rsmd.getColumnLabel(i) : rsmd.getColumnName(i)).toLowerCase();
                         String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName);
                         String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey);
                         if (StringUtils.isEmpty(resultColumnCurrentMax) && !isDynamicTableName) {
@@ -321,6 +326,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
                     maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
                     String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
                     if (!StringUtils.isEmpty(maxValue)) {
+                        if(columnTypeMap.isEmpty()){
+                            // This means column type cache is clean after instance reboot. We should re-cache column type
+                            super.setup(context, false, finalFileToProcess);
+                        }
                         Integer type = getColumnType(tableName, colName);
 
                         // Add a condition for the WHERE clause
@@ -411,7 +420,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
         }
         if (type == null) {
             // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
-            throw new IllegalArgumentException("No column type found for: " + colName);
+            throw new ProcessException("No column type cache found for: " + colName);
         }
 
         return type;

http://git-wip-us.apache.org/repos/asf/nifi/blob/a29348f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index d8791a5..67a9bad 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -1040,6 +1040,61 @@ public class TestGenerateTableFetch {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testColumnTypeMissing() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+        // Load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(true);
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}");
+        runner.setIncomingConnection(true);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}");
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {{
+            put("tableName", "TEST_QUERY_DB_TABLE");
+            put("maxValueCol", "id");
+        }});
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        String query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id <= 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", query);
+        runner.clearTransferState();
+
+
+        // Clear columnTypeMap to simulate it's clean after instance reboot
+        processor.columnTypeMap.clear();
+
+        // Insert new records
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
+
+        // Re-launch FlowFile to se if re-cache column type works
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {{
+            put("tableName", "TEST_QUERY_DB_TABLE");
+            put("maxValueCol", "id");
+        }});
+
+        // It should re-cache column type
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 AND id <= 2 ORDER BY id FETCH NEXT 10000 ROWS ONLY", query);
+        runner.clearTransferState();
+    }
+
     /**
      * Simple implementation only for GenerateTableFetch processor testing.
      */