You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/01/23 14:12:08 UTC
nifi git commit: NIFI-4773: Moved DB Fetch processors' connection
code from setup to onTrigger
Repository: nifi
Updated Branches:
refs/heads/master 6f282c684 -> 84848f7cb
NIFI-4773: Moved DB Fetch processors' connection code from setup to onTrigger
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #2422.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/84848f7c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/84848f7c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/84848f7c
Branch: refs/heads/master
Commit: 84848f7cbb55fe6b6ad7d1c22166cef6d5a0b39c
Parents: 6f282c6
Author: Matthew Burgess <ma...@apache.org>
Authored: Mon Jan 22 13:42:35 2018 -0500
Committer: Pierre Villard <pi...@gmail.com>
Committed: Tue Jan 23 15:10:34 2018 +0100
----------------------------------------------------------------------
.../AbstractDatabaseFetchProcessor.java | 85 ++++++++++++--------
.../processors/standard/GenerateTableFetch.java | 7 +-
.../processors/standard/QueryDatabaseTable.java | 9 ++-
3 files changed, 61 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/84848f7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index fa2a86e..2145929 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -50,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import static java.sql.Types.ARRAY;
import static java.sql.Types.BIGINT;
@@ -176,6 +177,10 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
// pre-fetched when the processor is scheduled, rather than having to populate them on-the-fly.
protected volatile boolean isDynamicMaxValues = false;
+ // This value is cleared when the processor is scheduled, and set to true after setup() is called and completes successfully. This enables
+ // the setup logic to be performed in onTrigger() versus OnScheduled to avoid any issues with DB connection when first scheduled to run.
+ protected final AtomicBoolean setupComplete = new AtomicBoolean(false);
+
private static SimpleDateFormat TIME_TYPE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS");
// A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time
@@ -222,49 +227,61 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
return super.customValidate(validationContext);
}
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+ // If the max-value columns have changed, we need to re-fetch the column info from the DB
+ if (MAX_VALUE_COLUMN_NAMES.equals(descriptor) && newValue != null && !newValue.equals(oldValue)) {
+ setupComplete.set(false);
+ }
+ }
+
public void setup(final ProcessContext context) {
setup(context,true,null);
}
public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFile flowFile) {
- final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue();
+ synchronized (setupComplete) {
+ setupComplete.set(false);
+ final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue();
- // If there are no max-value column names specified, we don't need to perform this processing
- if (StringUtils.isEmpty(maxValueColumnNames)) {
- return;
- }
+ // If there are no max-value column names specified, we don't need to perform this processing
+ if (StringUtils.isEmpty(maxValueColumnNames)) {
+ return;
+ }
- // Try to fill the columnTypeMap with the types of the desired max-value columns
- final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
- final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-
- final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
- try (final Connection con = dbcpService.getConnection();
- final Statement st = con.createStatement()) {
-
- // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible
- // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read
- // approach as in Apache Drill
- String query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null);
- ResultSet resultSet = st.executeQuery(query);
- ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- int numCols = resultSetMetaData.getColumnCount();
- if (numCols > 0) {
- if (shouldCleanCache){
- columnTypeMap.clear();
- }
- for (int i = 1; i <= numCols; i++) {
- String colName = resultSetMetaData.getColumnName(i).toLowerCase();
- String colKey = getStateKey(tableName, colName);
- int colType = resultSetMetaData.getColumnType(i);
- columnTypeMap.putIfAbsent(colKey, colType);
+ // Try to fill the columnTypeMap with the types of the desired max-value columns
+ final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+ try (final Connection con = dbcpService.getConnection();
+ final Statement st = con.createStatement()) {
+
+ // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible
+ // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read
+ // approach as in Apache Drill
+ String query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null);
+ ResultSet resultSet = st.executeQuery(query);
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ int numCols = resultSetMetaData.getColumnCount();
+ if (numCols > 0) {
+ if (shouldCleanCache) {
+ columnTypeMap.clear();
+ }
+ for (int i = 1; i <= numCols; i++) {
+ String colName = resultSetMetaData.getColumnName(i).toLowerCase();
+ String colKey = getStateKey(tableName, colName);
+ int colType = resultSetMetaData.getColumnType(i);
+ columnTypeMap.putIfAbsent(colKey, colType);
+ }
+ } else {
+ throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames);
}
- } else {
- throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames);
- }
- } catch (SQLException e) {
- throw new ProcessException("Unable to communicate with database in order to determine column types", e);
+ } catch (SQLException e) {
+ throw new ProcessException("Unable to communicate with database in order to determine column types", e);
+ }
+ setupComplete.set(true);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/84848f7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 1bae371..8f535b3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -155,9 +155,6 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
@OnScheduled
public void setup(final ProcessContext context) {
maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
- if (!isDynamicTableName && !isDynamicMaxValues) {
- super.setup(context);
- }
if (context.hasIncomingConnection() && !context.hasNonLoopConnection()) {
getLogger().error("The failure relationship can be used only if there is another incoming connection to this processor.");
}
@@ -165,6 +162,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+ // Fetch the column/table info once (if the table name and max value columns are not dynamic). Otherwise do the setup later
+ if (!isDynamicTableName && !isDynamicMaxValues && !setupComplete.get()) {
+ super.setup(context);
+ }
ProcessSession session = sessionFactory.createSession();
FlowFile fileToProcess = null;
http://git-wip-us.apache.org/repos/asf/nifi/blob/84848f7c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 0532b79..ffe5845 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -18,10 +18,10 @@ package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -73,7 +73,7 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAME
import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
-@EventDriven
+@TriggerSerially
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"sql", "select", "jdbc", "query", "database"})
@SeeAlso({GenerateTableFetch.class, ExecuteSQL.class})
@@ -178,11 +178,14 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
@OnScheduled
public void setup(final ProcessContext context) {
maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
- super.setup(context);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+ // Fetch the column/table info once
+ if (!setupComplete.get()) {
+ super.setup(context);
+ }
ProcessSession session = sessionFactory.createSession();
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();