You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/01/23 14:12:08 UTC

nifi git commit: NIFI-4773: Moved DB Fetch processors' connection code from setup to onTrigger

Repository: nifi
Updated Branches:
  refs/heads/master 6f282c684 -> 84848f7cb


NIFI-4773: Moved DB Fetch processors' connection code from setup to onTrigger

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2422.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/84848f7c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/84848f7c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/84848f7c

Branch: refs/heads/master
Commit: 84848f7cbb55fe6b6ad7d1c22166cef6d5a0b39c
Parents: 6f282c6
Author: Matthew Burgess <ma...@apache.org>
Authored: Mon Jan 22 13:42:35 2018 -0500
Committer: Pierre Villard <pi...@gmail.com>
Committed: Tue Jan 23 15:10:34 2018 +0100

----------------------------------------------------------------------
 .../AbstractDatabaseFetchProcessor.java         | 85 ++++++++++++--------
 .../processors/standard/GenerateTableFetch.java |  7 +-
 .../processors/standard/QueryDatabaseTable.java |  9 ++-
 3 files changed, 61 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/84848f7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index fa2a86e..2145929 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -50,6 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.sql.Types.ARRAY;
 import static java.sql.Types.BIGINT;
@@ -176,6 +177,10 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
     // pre-fetched when the processor is scheduled, rather than having to populate them on-the-fly.
     protected volatile boolean isDynamicMaxValues = false;
 
+    // This value is cleared when the processor is scheduled, and set to true after setup() is called and completes successfully. This enables
+    // the setup logic to be performed in onTrigger() versus OnScheduled to avoid any issues with DB connection when first scheduled to run.
+    protected final AtomicBoolean setupComplete = new AtomicBoolean(false);
+
     private static SimpleDateFormat TIME_TYPE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS");
 
     // A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time
@@ -222,49 +227,61 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
         return super.customValidate(validationContext);
     }
 
+    @Override
+    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+        // If the max-value columns have changed, we need to re-fetch the column info from the DB
+        if (MAX_VALUE_COLUMN_NAMES.equals(descriptor) && newValue != null && !newValue.equals(oldValue)) {
+            setupComplete.set(false);
+        }
+    }
+
     public void setup(final ProcessContext context) {
         setup(context,true,null);
     }
 
     public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFile flowFile) {
-        final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue();
+        synchronized (setupComplete) {
+            setupComplete.set(false);
+            final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue();
 
-        // If there are no max-value column names specified, we don't need to perform this processing
-        if (StringUtils.isEmpty(maxValueColumnNames)) {
-            return;
-        }
+            // If there are no max-value column names specified, we don't need to perform this processing
+            if (StringUtils.isEmpty(maxValueColumnNames)) {
+                return;
+            }
 
-        // Try to fill the columnTypeMap with the types of the desired max-value columns
-        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-
-        final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
-        try (final Connection con = dbcpService.getConnection();
-             final Statement st = con.createStatement()) {
-
-            // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible
-            // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read
-            // approach as in Apache Drill
-            String query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null);
-            ResultSet resultSet = st.executeQuery(query);
-            ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-            int numCols = resultSetMetaData.getColumnCount();
-            if (numCols > 0) {
-                if (shouldCleanCache){
-                    columnTypeMap.clear();
-                }
-                for (int i = 1; i <= numCols; i++) {
-                    String colName = resultSetMetaData.getColumnName(i).toLowerCase();
-                    String colKey = getStateKey(tableName, colName);
-                    int colType = resultSetMetaData.getColumnType(i);
-                    columnTypeMap.putIfAbsent(colKey, colType);
+            // Try to fill the columnTypeMap with the types of the desired max-value columns
+            final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+            final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+            final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+            try (final Connection con = dbcpService.getConnection();
+                 final Statement st = con.createStatement()) {
+
+                // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible
+                // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read
+                // approach as in Apache Drill
+                String query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null);
+                ResultSet resultSet = st.executeQuery(query);
+                ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+                int numCols = resultSetMetaData.getColumnCount();
+                if (numCols > 0) {
+                    if (shouldCleanCache) {
+                        columnTypeMap.clear();
+                    }
+                    for (int i = 1; i <= numCols; i++) {
+                        String colName = resultSetMetaData.getColumnName(i).toLowerCase();
+                        String colKey = getStateKey(tableName, colName);
+                        int colType = resultSetMetaData.getColumnType(i);
+                        columnTypeMap.putIfAbsent(colKey, colType);
+                    }
+                } else {
+                    throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames);
                 }
-            } else {
-                throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames);
-            }
 
-        } catch (SQLException e) {
-            throw new ProcessException("Unable to communicate with database in order to determine column types", e);
+            } catch (SQLException e) {
+                throw new ProcessException("Unable to communicate with database in order to determine column types", e);
+            }
+            setupComplete.set(true);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/84848f7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 1bae371..8f535b3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -155,9 +155,6 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
     @OnScheduled
     public void setup(final ProcessContext context) {
         maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
-        if (!isDynamicTableName && !isDynamicMaxValues) {
-            super.setup(context);
-        }
         if (context.hasIncomingConnection() && !context.hasNonLoopConnection()) {
             getLogger().error("The failure relationship can be used only if there is another incoming connection to this processor.");
         }
@@ -165,6 +162,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+        // Fetch the column/table info once (if the table name and max value columns are not dynamic). Otherwise do the setup later
+        if (!isDynamicTableName && !isDynamicMaxValues && !setupComplete.get()) {
+            super.setup(context);
+        }
         ProcessSession session = sessionFactory.createSession();
 
         FlowFile fileToProcess = null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/84848f7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 0532b79..ffe5845 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -18,10 +18,10 @@ package org.apache.nifi.processors.standard;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -73,7 +73,7 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAME
 import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
 
 
-@EventDriven
+@TriggerSerially
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"sql", "select", "jdbc", "query", "database"})
 @SeeAlso({GenerateTableFetch.class, ExecuteSQL.class})
@@ -178,11 +178,14 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
     @OnScheduled
     public void setup(final ProcessContext context) {
         maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
-        super.setup(context);
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+        // Fetch the column/table info once
+        if (!setupComplete.get()) {
+            super.setup(context);
+        }
         ProcessSession session = sessionFactory.createSession();
         final List<FlowFile> resultSetFlowFiles = new ArrayList<>();