You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/07/17 22:06:54 UTC
[2/2] nifi git commit: NIFI-2157: Add GenerateTableFetch processor
NIFI-2157: Add GenerateTableFetch processor
This closes #645
Signed-off-by: jpercivall <jo...@yahoo.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/01cae237
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/01cae237
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/01cae237
Branch: refs/heads/master
Commit: 01cae237454c0d7ffedbe7dd08dbe705bc22cb09
Parents: ad02b43
Author: Matt Burgess <ma...@apache.org>
Authored: Wed Jul 13 21:29:51 2016 -0400
Committer: jpercivall <jo...@yahoo.com>
Committed: Sun Jul 17 17:54:42 2016 -0400
----------------------------------------------------------------------
.../AbstractDatabaseFetchProcessor.java | 362 ++++++++++++++++++
.../processors/standard/GenerateTableFetch.java | 256 +++++++++++++
.../processors/standard/QueryDatabaseTable.java | 382 ++-----------------
.../processors/standard/db/DatabaseAdapter.java | 38 ++
.../db/impl/GenericDatabaseAdapter.java | 64 ++++
.../standard/db/impl/OracleDatabaseAdapter.java | 82 ++++
.../org.apache.nifi.processor.Processor | 1 +
....nifi.processors.standard.db.DatabaseAdapter | 16 +
.../standard/QueryDatabaseTableTest.java | 77 ++--
.../standard/TestGenerateTableFetch.java | 224 +++++++++++
.../standard/db/impl/DerbyDatabaseAdapter.java | 68 ++++
....nifi.processors.standard.db.DatabaseAdapter | 15 +
12 files changed, 1204 insertions(+), 381 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
new file mode 100644
index 0000000..6182d93
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.text.DecimalFormat;
+import java.text.ParseException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+
+import static java.sql.Types.ARRAY;
+import static java.sql.Types.BIGINT;
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BIT;
+import static java.sql.Types.BLOB;
+import static java.sql.Types.BOOLEAN;
+import static java.sql.Types.CLOB;
+import static java.sql.Types.DECIMAL;
+import static java.sql.Types.DOUBLE;
+import static java.sql.Types.FLOAT;
+import static java.sql.Types.INTEGER;
+import static java.sql.Types.LONGVARBINARY;
+import static java.sql.Types.NUMERIC;
+import static java.sql.Types.CHAR;
+import static java.sql.Types.DATE;
+import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NCHAR;
+import static java.sql.Types.NVARCHAR;
+import static java.sql.Types.REAL;
+import static java.sql.Types.ROWID;
+import static java.sql.Types.SMALLINT;
+import static java.sql.Types.TIME;
+import static java.sql.Types.TIMESTAMP;
+import static java.sql.Types.TINYINT;
+import static java.sql.Types.VARBINARY;
+import static java.sql.Types.VARCHAR;
+
+/**
+ * A base class for common code shared by processors that fetch RDBMS data.
+ */
+public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFactoryProcessor {
+
+ // Relationships
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Successfully created FlowFile from SQL query result set.")
+ .build();
+
+ protected Set<Relationship> relationships;
+
+ // Properties
+ public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
+ .name("Database Connection Pooling Service")
+ .description("The Controller Service that is used to obtain a connection to the database.")
+ .required(true)
+ .identifiesControllerService(DBCPService.class)
+ .build();
+
+ public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+ .name("Table Name")
+ .description("The name of the database table to be queried.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor COLUMN_NAMES = new PropertyDescriptor.Builder()
+ .name("Columns to Return")
+ .description("A comma-separated list of column names to be used in the query. If your database requires "
+ + "special treatment of the names (quoting, e.g.), each name should include such treatment. If no "
+ + "column names are supplied, all columns in the specified table will be returned.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new PropertyDescriptor.Builder()
+ .name("Maximum-value Columns")
+ .description("A comma-separated list of column names. The processor will keep track of the maximum value "
+ + "for each column that has been returned since the processor started running. This can be used to "
+ + "retrieve only those rows that have been added/updated since the last retrieval. Note that some "
+ + "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these "
+ + "types should not be listed in this property, and will result in error(s) during processing. If no columns "
+ + "are provided, all rows from the table will be considered, which could have a performance impact.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Max Wait Time")
+ .description("The maximum amount of time allowed for a running SQL select query "
+ + ", zero means there is no limit. Max time less than 1 second will be equal to zero.")
+ .defaultValue("0 seconds")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ protected List<PropertyDescriptor> propDescriptors;
+
+ public static final PropertyDescriptor DB_TYPE;
+
+ protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>();
+ protected final Map<String, Integer> columnTypeMap = new HashMap<>();
+
+ static {
+ // Load the DatabaseAdapters
+ ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
+ dbAdapterLoader.forEach(it -> dbAdapters.put(it.getName(), it));
+
+ DB_TYPE = new PropertyDescriptor.Builder()
+ .name("db-fetch-db-type")
+ .displayName("Database Type")
+ .description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
+ + "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
+ .allowableValues(dbAdapters.keySet())
+ .defaultValue(dbAdapters.values().stream().findFirst().get().getName())
+ .required(true)
+ .build();
+ }
+
+ public void setup(final ProcessContext context) {
+ // Try to fill the columnTypeMap with the types of the desired max-value columns
+ final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+ final String tableName = context.getProperty(TABLE_NAME).getValue();
+ final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
+ final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+
+ try (final Connection con = dbcpService.getConnection();
+ final Statement st = con.createStatement()) {
+
+ // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible
+ // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read
+ // approach as in Apache Drill
+ String query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null);
+ ResultSet resultSet = st.executeQuery(query);
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ int numCols = resultSetMetaData.getColumnCount();
+ if (numCols > 0) {
+ columnTypeMap.clear();
+ for (int i = 1; i <= numCols; i++) {
+ String colName = resultSetMetaData.getColumnName(i).toLowerCase();
+ int colType = resultSetMetaData.getColumnType(i);
+ columnTypeMap.put(colName, colType);
+ }
+
+ } else {
+ throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames);
+ }
+
+ } catch (SQLException e) {
+ throw new ProcessException("Unable to communicate with database in order to determine column types", e);
+ }
+ }
+
+ protected static String getMaxValueFromRow(ResultSet resultSet,
+ int columnIndex,
+ Integer type,
+ String maxValueString,
+ String databaseType)
+ throws ParseException, IOException, SQLException {
+
+ // Skip any columns we're not keeping track of or whose value is null
+ if (type == null || resultSet.getObject(columnIndex) == null) {
+ return null;
+ }
+
+ switch (type) {
+ case CHAR:
+ case LONGNVARCHAR:
+ case LONGVARCHAR:
+ case NCHAR:
+ case NVARCHAR:
+ case VARCHAR:
+ case ROWID:
+ String colStringValue = resultSet.getString(columnIndex);
+ if (maxValueString == null || colStringValue.compareTo(maxValueString) > 0) {
+ return colStringValue;
+ }
+ break;
+
+ case INTEGER:
+ case SMALLINT:
+ case TINYINT:
+ Integer colIntValue = resultSet.getInt(columnIndex);
+ Integer maxIntValue = null;
+ if (maxValueString != null) {
+ maxIntValue = Integer.valueOf(maxValueString);
+ }
+ if (maxIntValue == null || colIntValue > maxIntValue) {
+ return colIntValue.toString();
+ }
+ break;
+
+ case BIGINT:
+ Long colLongValue = resultSet.getLong(columnIndex);
+ Long maxLongValue = null;
+ if (maxValueString != null) {
+ maxLongValue = Long.valueOf(maxValueString);
+ }
+ if (maxLongValue == null || colLongValue > maxLongValue) {
+ return colLongValue.toString();
+ }
+ break;
+
+ case FLOAT:
+ case REAL:
+ case DOUBLE:
+ Double colDoubleValue = resultSet.getDouble(columnIndex);
+ Double maxDoubleValue = null;
+ if (maxValueString != null) {
+ maxDoubleValue = Double.valueOf(maxValueString);
+ }
+ if (maxDoubleValue == null || colDoubleValue > maxDoubleValue) {
+ return colDoubleValue.toString();
+ }
+ break;
+
+ case DECIMAL:
+ case NUMERIC:
+ BigDecimal colBigDecimalValue = resultSet.getBigDecimal(columnIndex);
+ BigDecimal maxBigDecimalValue = null;
+ if (maxValueString != null) {
+ DecimalFormat df = new DecimalFormat();
+ df.setParseBigDecimal(true);
+ maxBigDecimalValue = (BigDecimal) df.parse(maxValueString);
+ }
+ if (maxBigDecimalValue == null || colBigDecimalValue.compareTo(maxBigDecimalValue) > 0) {
+ return colBigDecimalValue.toString();
+ }
+ break;
+
+ case DATE:
+ Date rawColDateValue = resultSet.getDate(columnIndex);
+ java.sql.Date colDateValue = new java.sql.Date(rawColDateValue.getTime());
+ java.sql.Date maxDateValue = null;
+ if (maxValueString != null) {
+ maxDateValue = java.sql.Date.valueOf(maxValueString);
+ }
+ if (maxDateValue == null || colDateValue.after(maxDateValue)) {
+ return colDateValue.toString();
+ }
+ break;
+
+ case TIME:
+ Date rawColTimeValue = resultSet.getDate(columnIndex);
+ java.sql.Time colTimeValue = new java.sql.Time(rawColTimeValue.getTime());
+ java.sql.Time maxTimeValue = null;
+ if (maxValueString != null) {
+ maxTimeValue = java.sql.Time.valueOf(maxValueString);
+ }
+ if (maxTimeValue == null || colTimeValue.after(maxTimeValue)) {
+ return colTimeValue.toString();
+ }
+ break;
+
+ case TIMESTAMP:
+ // Oracle timestamp queries must use literals in java.sql.Date format
+ if ("Oracle".equals(databaseType)) {
+ Date rawColOracleTimestampValue = resultSet.getDate(columnIndex);
+ java.sql.Date oracleTimestampValue = new java.sql.Date(rawColOracleTimestampValue.getTime());
+ java.sql.Date maxOracleTimestampValue = null;
+ if (maxValueString != null) {
+ maxOracleTimestampValue = java.sql.Date.valueOf(maxValueString);
+ }
+ if (maxOracleTimestampValue == null || oracleTimestampValue.after(maxOracleTimestampValue)) {
+ return oracleTimestampValue.toString();
+ }
+ } else {
+ Timestamp rawColTimestampValue = resultSet.getTimestamp(columnIndex);
+ java.sql.Timestamp colTimestampValue = new java.sql.Timestamp(rawColTimestampValue.getTime());
+ java.sql.Timestamp maxTimestampValue = null;
+ if (maxValueString != null) {
+ maxTimestampValue = java.sql.Timestamp.valueOf(maxValueString);
+ }
+ if (maxTimestampValue == null || colTimestampValue.after(maxTimestampValue)) {
+ return colTimestampValue.toString();
+ }
+ }
+ break;
+
+ case BIT:
+ case BOOLEAN:
+ case BINARY:
+ case VARBINARY:
+ case LONGVARBINARY:
+ case ARRAY:
+ case BLOB:
+ case CLOB:
+ default:
+ throw new IOException("Type for column " + columnIndex + " is not valid for maintaining maximum value");
+ }
+ return null;
+ }
+
+ /**
+ * Returns a SQL literal for the given value based on its type. For example, values of character type need to be enclosed
+ * in single quotes, whereas values of numeric type should not be.
+ *
+ * @param type The JDBC type for the desired literal
+ * @param value The value to be converted to a SQL literal
+ * @return A String representing the given value as a literal of the given type
+ */
+ protected static String getLiteralByType(int type, String value, String databaseType) {
+ // Format value based on column type. For example, strings and timestamps need to be quoted
+ switch (type) {
+ // For string-represented values, put in single quotes
+ case CHAR:
+ case LONGNVARCHAR:
+ case LONGVARCHAR:
+ case NCHAR:
+ case NVARCHAR:
+ case VARCHAR:
+ case ROWID:
+ case DATE:
+ case TIME:
+ return "'" + value + "'";
+ case TIMESTAMP:
+ // Timestamp literals in Oracle need to be cast with TO_DATE
+ if ("Oracle".equals(databaseType)) {
+ return "to_date('" + value + "', 'yyyy-mm-dd HH24:MI:SS')";
+ } else {
+ return "'" + value + "'";
+ }
+ // Else leave as is (numeric types, e.g.)
+ default:
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
new file mode 100644
index 0000000..23a2e3c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+
+@TriggerSerially
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"sql", "select", "jdbc", "query", "database", "fetch", "generate"})
+@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class})
+@CapabilityDescription("Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, "
+ + "determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, "
+ + "which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This "
+ + "processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for "
+ + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor "
+ + "to fetch only those records that have max values greater than the retained values. This can be used for "
+ + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+ + "per the State Management documentation")
+public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
+
+ public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
+ .name("gen-table-fetch-partition-size")
+ .displayName("Partition Size")
+ .description("The number of result rows to be fetched by each generated SQL statement. The total number of rows in "
+ + "the table divided by the partition size gives the number of SQL statements (i.e. FlowFiles) generated. A "
+ + "value of zero indicates that a single FlowFile is to be generated whose SQL statement will fetch all rows "
+ + "in the table.")
+ .defaultValue("10000")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public GenerateTableFetch() {
+ final Set<Relationship> r = new HashSet<>();
+ r.add(REL_SUCCESS);
+ relationships = Collections.unmodifiableSet(r);
+
+ final List<PropertyDescriptor> pds = new ArrayList<>();
+ pds.add(DBCP_SERVICE);
+ pds.add(DB_TYPE);
+ pds.add(TABLE_NAME);
+ pds.add(COLUMN_NAMES);
+ pds.add(MAX_VALUE_COLUMN_NAMES);
+ pds.add(QUERY_TIMEOUT);
+ pds.add(PARTITION_SIZE);
+ propDescriptors = Collections.unmodifiableList(pds);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propDescriptors;
+ }
+
+ @OnScheduled
+ public void setup(final ProcessContext context) {
+ super.setup(context);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+ ProcessSession session = sessionFactory.createSession();
+ final ComponentLog logger = getLogger();
+
+ final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+ final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+ final String tableName = context.getProperty(TABLE_NAME).getValue();
+ final String columnNames = context.getProperty(COLUMN_NAMES).getValue();
+ final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
+ final int partitionSize = context.getProperty(PARTITION_SIZE).asInteger();
+
+ final StateManager stateManager = context.getStateManager();
+ final StateMap stateMap;
+
+ try {
+ stateMap = stateManager.getState(Scope.CLUSTER);
+ } catch (final IOException ioe) {
+ logger.error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
+ + "query until this is accomplished.", ioe);
+ context.yield();
+ return;
+ }
+ try {
+ // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
+ // set as the current state map (after the session has been committed)
+ final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
+
+ // Build a WHERE clause with maximum-value columns (if they exist), and a list of column names that will contain MAX(<column>) aliases. The
+ // executed SQL query will retrieve the count of all records after the filter(s) have been applied, as well as the new maximum values for the
+ // specified columns. This allows the processor to generate the correctly partitioned SQL statements as well as to update the state with the
+ // latest observed maximum values.
+ String whereClause = null;
+ List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
+ ? new ArrayList<>(0)
+ : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
+ List<String> maxValueClauses = new ArrayList<>(maxValueColumnNameList.size());
+
+ String columnsClause = null;
+ List<String> maxValueSelectColumns = new ArrayList<>(maxValueColumnNameList.size() + 1);
+ maxValueSelectColumns.add("COUNT(*)");
+
+ // For each maximum-value column, get a WHERE filter and a MAX(column) alias
+ maxValueColumnNameList.forEach(colName -> {
+ maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
+ String maxValue = statePropertyMap.get(colName.toLowerCase());
+ if (!StringUtils.isEmpty(maxValue)) {
+ Integer type = columnTypeMap.get(colName.toLowerCase());
+ if (type == null) {
+ // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled.
+ throw new IllegalArgumentException("No column type found for: " + colName);
+ }
+ // Add a condition for the WHERE clause
+ maxValueClauses.add(colName + " > " + getLiteralByType(type, maxValue, dbAdapter.getName()));
+ }
+ });
+
+ whereClause = StringUtils.join(maxValueClauses, " AND ");
+ columnsClause = StringUtils.join(maxValueSelectColumns, ", ");
+
+ // Build a SELECT query with maximum-value columns (if present)
+ final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null);
+ int rowCount = 0;
+
+ try (final Connection con = dbcpService.getConnection();
+ final Statement st = con.createStatement()) {
+
+ final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ st.setQueryTimeout(queryTimeout); // timeout in seconds
+
+ logger.debug("Executing {}", new Object[]{selectQuery});
+ ResultSet resultSet;
+
+ resultSet = st.executeQuery(selectQuery);
+
+ if (resultSet.next()) {
+ // Total row count is in the first column
+ rowCount = resultSet.getInt(1);
+
+ // Update the state map with the newly-observed maximum values
+ ResultSetMetaData rsmd = resultSet.getMetaData();
+ for (int i = 2; i <= rsmd.getColumnCount(); i++) {
+ String resultColumnName = rsmd.getColumnName(i).toLowerCase();
+ int type = rsmd.getColumnType(i);
+ try {
+ String newMaxValue = getMaxValueFromRow(resultSet, i, type, statePropertyMap.get(resultColumnName.toLowerCase()), dbAdapter.getName());
+ if (newMaxValue != null) {
+ statePropertyMap.put(resultColumnName, newMaxValue);
+ }
+ } catch (ParseException | IOException pie) {
+ // Fail the whole thing here before we start creating flow files and such
+ throw new ProcessException(pie);
+ }
+ }
+ } else {
+ // Something is very wrong here, one row (even if count is zero) should be returned
+ throw new SQLException("No rows returned from metadata query: " + selectQuery);
+ }
+ } catch (SQLException e) {
+ logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
+ throw new ProcessException(e);
+ }
+ final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
+
+
+ // Generate SQL statements to read "pages" of data
+ for (int i = 0; i < numberOfFetches; i++) {
+ FlowFile sqlFlowFile;
+
+ Integer limit = partitionSize == 0 ? null : partitionSize;
+ Integer offset = partitionSize == 0 ? null : i * partitionSize;
+ final String query = dbAdapter.getSelectStatement(tableName, columnNames, StringUtils.join(maxValueClauses, " AND "), null, limit, offset);
+ sqlFlowFile = session.create();
+ sqlFlowFile = session.write(sqlFlowFile, out -> {
+ out.write(query.getBytes());
+ });
+ session.transfer(sqlFlowFile, REL_SUCCESS);
+ }
+
+ session.commit();
+ try {
+ // Update the state
+ stateManager.setState(statePropertyMap, Scope.CLUSTER);
+ } catch (IOException ioe) {
+ logger.error("{} failed to update State Manager, observed maximum values will not be recorded. "
+ + "Also, any generated SQL statements may be duplicated.",
+ new Object[]{this, ioe});
+ }
+ } catch (final ProcessException pe) {
+ // Log the cause of the ProcessException if it is available
+ Throwable t = (pe.getCause() == null ? pe : pe.getCause());
+ logger.error("Error during processing: {}", new Object[]{t.getMessage()}, t);
+ session.rollback();
+ context.yield();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index e749696..ea24dd0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -32,31 +32,26 @@ import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch;
import java.io.IOException;
-import java.io.OutputStream;
-import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
-import java.sql.Date;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
-import java.sql.Timestamp;
-import java.text.DecimalFormat;
import java.text.ParseException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -64,36 +59,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-
-import static java.sql.Types.ARRAY;
-import static java.sql.Types.BIGINT;
-import static java.sql.Types.BINARY;
-import static java.sql.Types.BIT;
-import static java.sql.Types.BLOB;
-import static java.sql.Types.BOOLEAN;
-import static java.sql.Types.CHAR;
-import static java.sql.Types.CLOB;
-import static java.sql.Types.DATE;
-import static java.sql.Types.DECIMAL;
-import static java.sql.Types.DOUBLE;
-import static java.sql.Types.FLOAT;
-import static java.sql.Types.INTEGER;
-import static java.sql.Types.LONGNVARCHAR;
-import static java.sql.Types.LONGVARBINARY;
-import static java.sql.Types.LONGVARCHAR;
-import static java.sql.Types.NCHAR;
-import static java.sql.Types.NUMERIC;
-import static java.sql.Types.NVARCHAR;
-import static java.sql.Types.REAL;
-import static java.sql.Types.ROWID;
-import static java.sql.Types.SMALLINT;
-import static java.sql.Types.TIME;
-import static java.sql.Types.TIMESTAMP;
-import static java.sql.Types.TINYINT;
-import static java.sql.Types.VARBINARY;
-import static java.sql.Types.VARCHAR;
import java.util.concurrent.atomic.AtomicLong;
+
@EventDriven
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"sql", "select", "jdbc", "query", "database"})
@@ -108,72 +76,10 @@ import java.util.concurrent.atomic.AtomicLong;
+ "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+ "per the State Management documentation")
@WritesAttribute(attribute = "querydbtable.row.count")
-public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
+public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
- public static final String SQL_PREPROCESS_STRATEGY_NONE = "None";
- public static final String SQL_PREPROCESS_STRATEGY_ORACLE = "Oracle";
-
- // Relationships
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("Successfully created FlowFile from SQL query result set.")
- .build();
-
- private final Set<Relationship> relationships;
-
- public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
- .name("Database Connection Pooling Service")
- .description("The Controller Service that is used to obtain a connection to the database.")
- .required(true)
- .identifiesControllerService(DBCPService.class)
- .build();
-
- public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
- .name("Table Name")
- .description("The name of the database table to be queried.")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor COLUMN_NAMES = new PropertyDescriptor.Builder()
- .name("Columns to Return")
- .description("A comma-separated list of column names to be used in the query. If your database requires "
- + "special treatment of the names (quoting, e.g.), each name should include such treatment. If no "
- + "column names are supplied, all columns in the specified table will be returned.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new PropertyDescriptor.Builder()
- .name("Maximum-value Columns")
- .description("A comma-separated list of column names. The processor will keep track of the maximum value "
- + "for each column that has been returned since the processor started running. This can be used to "
- + "retrieve only those rows that have been added/updated since the last retrieval. Note that some "
- + "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these "
- + "types should not be listed in this property, and will result in error(s) during processing.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Max Wait Time")
- .description("The maximum amount of time allowed for a running SQL select query "
- + ", zero means there is no limit. Max time less than 1 second will be equal to zero.")
- .defaultValue("0 seconds")
- .required(true)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor SQL_PREPROCESS_STRATEGY = new PropertyDescriptor.Builder()
- .name("SQL Pre-processing Strategy")
- .description("The strategy to employ when generating the SQL for querying the table. A strategy may include "
- + "custom or database-specific code, such as the treatment of time/date formats.")
- .required(true)
- .allowableValues(SQL_PREPROCESS_STRATEGY_NONE, SQL_PREPROCESS_STRATEGY_ORACLE)
- .defaultValue("None")
- .build();
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("Fetch Size")
@@ -184,11 +90,6 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
-
- private final List<PropertyDescriptor> propDescriptors;
-
- protected final Map<String, Integer> columnTypeMap = new HashMap<>();
-
public QueryDatabaseTable() {
final Set<Relationship> r = new HashSet<>();
r.add(REL_SUCCESS);
@@ -196,11 +97,11 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
final List<PropertyDescriptor> pds = new ArrayList<>();
pds.add(DBCP_SERVICE);
+ pds.add(DB_TYPE);
pds.add(TABLE_NAME);
pds.add(COLUMN_NAMES);
pds.add(MAX_VALUE_COLUMN_NAMES);
pds.add(QUERY_TIMEOUT);
- pds.add(SQL_PREPROCESS_STRATEGY);
pds.add(FETCH_SIZE);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -217,36 +118,7 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
@OnScheduled
public void setup(final ProcessContext context) {
- // Try to fill the columnTypeMap with the types of the desired max-value columns
- final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
- final String tableName = context.getProperty(TABLE_NAME).getValue();
- final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
-
- try (final Connection con = dbcpService.getConnection();
- final Statement st = con.createStatement()) {
-
- // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible
- // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read
- // approach as in Apache Drill
- String query = getSelectFromClause(tableName, maxValueColumnNames).append(" WHERE 1 = 0").toString();
- ResultSet resultSet = st.executeQuery(query);
- ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- int numCols = resultSetMetaData.getColumnCount();
- if (numCols > 0) {
- columnTypeMap.clear();
- for (int i = 1; i <= numCols; i++) {
- String colName = resultSetMetaData.getColumnName(i).toLowerCase();
- int colType = resultSetMetaData.getColumnType(i);
- columnTypeMap.put(colName, colType);
- }
-
- } else {
- throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames);
- }
-
- } catch (SQLException e) {
- throw new ProcessException("Unable to communicate with database in order to determine column types", e);
- }
+ super.setup(context);
}
@Override
@@ -257,10 +129,10 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
final ComponentLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+ final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
final String tableName = context.getProperty(TABLE_NAME).getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
- final String preProcessStrategy = context.getProperty(SQL_PREPROCESS_STRATEGY).getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
final StateManager stateManager = context.getStateManager();
@@ -278,7 +150,10 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
// set as the current state map (after the session has been committed)
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
- final String selectQuery = getQuery(tableName, columnNames, getColumns(maxValueColumnNames), stateMap, preProcessStrategy);
+ List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
+ ? null
+ : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
+ final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, stateMap);
final StopWatch stopWatch = new StopWatch(true);
try (final Connection con = dbcpService.getConnection();
@@ -299,19 +174,16 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
final AtomicLong nrOfRows = new AtomicLong(0L);
fileToProcess = session.create();
- fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- try {
- logger.debug("Executing query {}", new Object[]{selectQuery});
- final ResultSet resultSet = st.executeQuery(selectQuery);
- // Max values will be updated in the state property map by the callback
- final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, preProcessStrategy);
- nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector));
-
- } catch (final SQLException e) {
- throw new ProcessException("Error during database query or conversion of records to Avro", e);
- }
+ fileToProcess = session.write(fileToProcess, out -> {
+ try {
+ logger.debug("Executing query {}", new Object[]{selectQuery});
+ final ResultSet resultSet = st.executeQuery(selectQuery);
+ // Max values will be updated in the state property map by the callback
+ final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter);
+ nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector));
+
+ } catch (final SQLException e) {
+ throw new ProcessException("Error during database query or conversion of records to Avro", e);
}
});
@@ -355,29 +227,12 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
}
}
- protected List<String> getColumns(String commaSeparatedColumnList) {
- if (StringUtils.isEmpty(commaSeparatedColumnList)) {
- return Collections.emptyList();
- }
- final String[] columns = commaSeparatedColumnList.split(",");
- final List<String> columnList = new ArrayList<>(columns.length);
- for (String column : columns) {
- if (column != null) {
- String trimmedColumn = column.trim();
- if (!StringUtils.isEmpty(trimmedColumn)) {
- columnList.add(trimmedColumn);
- }
- }
- }
- return columnList;
- }
-
- protected String getQuery(String tableName, String columnNames, List<String> maxValColumnNames,
- StateMap stateMap, String preProcessStrategy) {
+ protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
+ StateMap stateMap) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name must be specified");
}
- final StringBuilder query = new StringBuilder(getSelectFromClause(tableName, columnNames));
+ final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
// Check state map for last max values
if (stateMap != null && stateMap.getVersion() != -1 && maxValColumnNames != null) {
@@ -392,7 +247,7 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
throw new IllegalArgumentException("No column type found for: " + colName);
}
// Add a condition for the WHERE clause
- whereClauses.add(colName + " > " + getLiteralByType(type, maxValue, preProcessStrategy));
+ whereClauses.add(colName + " > " + getLiteralByType(type, maxValue, dbAdapter.getName()));
}
}
if (!whereClauses.isEmpty()) {
@@ -404,67 +259,13 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
return query.toString();
}
- /**
- * Returns a basic SELECT ... FROM clause with the given column names and table name. If no column names are found,
- * the wildcard (*) is used to select all columns.
- *
- * @param tableName The name of the table to select from
- * @param columnNames A comma-separated list of column names to select from the table
- * @return A SQL select statement representing a query of the given column names from the given table
- */
- protected StringBuilder getSelectFromClause(String tableName, String columnNames) {
- final StringBuilder query = new StringBuilder("SELECT ");
- if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) {
- query.append("*");
- } else {
- query.append(columnNames);
- }
- query.append(" FROM ");
- query.append(tableName);
- return query;
- }
-
- /**
- * Returns a SQL literal for the given value based on its type. For example, values of character type need to be enclosed
- * in single quotes, whereas values of numeric type should not be.
- *
- * @param type The JDBC type for the desired literal
- * @param value The value to be converted to a SQL literal
- * @return A String representing the given value as a literal of the given type
- */
- protected String getLiteralByType(int type, String value, String preProcessStrategy) {
- // Format value based on column type. For example, strings and timestamps need to be quoted
- switch (type) {
- // For string-represented values, put in single quotes
- case CHAR:
- case LONGNVARCHAR:
- case LONGVARCHAR:
- case NCHAR:
- case NVARCHAR:
- case VARCHAR:
- case ROWID:
- case DATE:
- case TIME:
- return "'" + value + "'";
- case TIMESTAMP:
- // Timestamp literals in Oracle need to be cast with TO_DATE
- if (SQL_PREPROCESS_STRATEGY_ORACLE.equals(preProcessStrategy)) {
- return "to_date('" + value + "', 'yyyy-mm-dd HH24:MI:SS')";
- } else {
- return "'" + value + "'";
- }
- // Else leave as is (numeric types, e.g.)
- default:
- return value;
- }
- }
protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
- String preProcessStrategy;
+ DatabaseAdapter dbAdapter;
Map<String, String> newColMap;
- public MaxValueResultSetRowCollector(Map<String, String> stateMap, String preProcessStrategy) {
- this.preProcessStrategy = preProcessStrategy;
+ public MaxValueResultSetRowCollector(Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
+ this.dbAdapter = dbAdapter;
newColMap = stateMap;
}
@@ -486,130 +287,9 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
continue;
}
String maxValueString = newColMap.get(colName);
- switch (type) {
- case CHAR:
- case LONGNVARCHAR:
- case LONGVARCHAR:
- case NCHAR:
- case NVARCHAR:
- case VARCHAR:
- case ROWID:
- String colStringValue = resultSet.getString(i);
- if (maxValueString == null || colStringValue.compareTo(maxValueString) > 0) {
- newColMap.put(colName, colStringValue);
- }
- break;
-
- case INTEGER:
- case SMALLINT:
- case TINYINT:
- Integer colIntValue = resultSet.getInt(i);
- Integer maxIntValue = null;
- if (maxValueString != null) {
- maxIntValue = Integer.valueOf(maxValueString);
- }
- if (maxIntValue == null || colIntValue > maxIntValue) {
- newColMap.put(colName, colIntValue.toString());
- }
- break;
-
- case BIGINT:
- Long colLongValue = resultSet.getLong(i);
- Long maxLongValue = null;
- if (maxValueString != null) {
- maxLongValue = Long.valueOf(maxValueString);
- }
- if (maxLongValue == null || colLongValue > maxLongValue) {
- newColMap.put(colName, colLongValue.toString());
- }
- break;
-
- case FLOAT:
- case REAL:
- case DOUBLE:
- Double colDoubleValue = resultSet.getDouble(i);
- Double maxDoubleValue = null;
- if (maxValueString != null) {
- maxDoubleValue = Double.valueOf(maxValueString);
- }
- if (maxDoubleValue == null || colDoubleValue > maxDoubleValue) {
- newColMap.put(colName, colDoubleValue.toString());
- }
- break;
-
- case DECIMAL:
- case NUMERIC:
- BigDecimal colBigDecimalValue = resultSet.getBigDecimal(i);
- BigDecimal maxBigDecimalValue = null;
- if (maxValueString != null) {
- DecimalFormat df = new DecimalFormat();
- df.setParseBigDecimal(true);
- maxBigDecimalValue = (BigDecimal) df.parse(maxValueString);
- }
- if (maxBigDecimalValue == null || colBigDecimalValue.compareTo(maxBigDecimalValue) > 0) {
- newColMap.put(colName, colBigDecimalValue.toString());
- }
- break;
-
- case DATE:
- Date rawColDateValue = resultSet.getDate(i);
- java.sql.Date colDateValue = new java.sql.Date(rawColDateValue.getTime());
- java.sql.Date maxDateValue = null;
- if (maxValueString != null) {
- maxDateValue = java.sql.Date.valueOf(maxValueString);
- }
- if (maxDateValue == null || colDateValue.after(maxDateValue)) {
- newColMap.put(colName, colDateValue.toString());
- }
- break;
-
- case TIME:
- Date rawColTimeValue = resultSet.getDate(i);
- java.sql.Time colTimeValue = new java.sql.Time(rawColTimeValue.getTime());
- java.sql.Time maxTimeValue = null;
- if (maxValueString != null) {
- maxTimeValue = java.sql.Time.valueOf(maxValueString);
- }
- if (maxTimeValue == null || colTimeValue.after(maxTimeValue)) {
- newColMap.put(colName, colTimeValue.toString());
- }
- break;
-
- case TIMESTAMP:
- // Oracle timestamp queries must use literals in java.sql.Date format
- if (SQL_PREPROCESS_STRATEGY_ORACLE.equals(preProcessStrategy)) {
- Date rawColOracleTimestampValue = resultSet.getDate(i);
- java.sql.Date oracleTimestampValue = new java.sql.Date(rawColOracleTimestampValue.getTime());
- java.sql.Date maxOracleTimestampValue = null;
- if (maxValueString != null) {
- maxOracleTimestampValue = java.sql.Date.valueOf(maxValueString);
- }
- if (maxOracleTimestampValue == null || oracleTimestampValue.after(maxOracleTimestampValue)) {
- newColMap.put(colName, oracleTimestampValue.toString());
- }
- } else {
- Timestamp rawColTimestampValue = resultSet.getTimestamp(i);
- java.sql.Timestamp colTimestampValue = new java.sql.Timestamp(rawColTimestampValue.getTime());
- java.sql.Timestamp maxTimestampValue = null;
- if (maxValueString != null) {
- maxTimestampValue = java.sql.Timestamp.valueOf(maxValueString);
- }
- if (maxTimestampValue == null || colTimestampValue.after(maxTimestampValue)) {
- newColMap.put(colName, colTimestampValue.toString());
- }
- }
- break;
-
- case BIT:
- case BOOLEAN:
- case BINARY:
- case VARBINARY:
- case LONGVARBINARY:
- case ARRAY:
- case BLOB:
- case CLOB:
- default:
- throw new IOException("Type " + meta.getColumnTypeName(i) + " is not valid for maintaining maximum value");
+ String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName());
+ if (newMaxValueString != null) {
+ newColMap.put(colName, newMaxValueString);
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
new file mode 100644
index 0000000..21ab331
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db;
+
+/**
+ * Interface for RDBMS/JDBC-specific code.
+ */
+public interface DatabaseAdapter {
+
+ String getName();
+
+ /**
+ * Returns a SQL SELECT statement with the given clauses applied.
+ *
+ * @param tableName The name of the table to fetch rows from
+ * @param columnNames The names of the columns to fetch from the table
+ * @param whereClause The filter to apply to the statement. This should not include the WHERE keyword
+ * @param orderByClause The columns/clause used for ordering the result rows. This should not include the ORDER BY keywords
+ * @param limit The value for the LIMIT clause (i.e. the number of rows to return)
+ * @param offset The value for the OFFSET clause (i.e. the number of rows to skip)
+ * @return A String containing a SQL SELECT statement with the given clauses applied
+ */
+ String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
new file mode 100644
index 0000000..5beae29
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+
+/**
+ * A generic database adapter that generates ANSI SQL.
+ */
+public class GenericDatabaseAdapter implements DatabaseAdapter {
+ @Override
+ public String getName() {
+ return "Generic";
+ }
+
+ @Override
+ public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset) {
+ if (StringUtils.isEmpty(tableName)) {
+ throw new IllegalArgumentException("Table name cannot be null or empty");
+ }
+ final StringBuilder query = new StringBuilder("SELECT ");
+ if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) {
+ query.append("*");
+ } else {
+ query.append(columnNames);
+ }
+ query.append(" FROM ");
+ query.append(tableName);
+
+ if (!StringUtils.isEmpty(whereClause)) {
+ query.append(" WHERE ");
+ query.append(whereClause);
+ }
+ if (!StringUtils.isEmpty(orderByClause)) {
+ query.append(" ORDER BY ");
+ query.append(whereClause);
+ }
+ if (limit != null) {
+ query.append(" LIMIT ");
+ query.append(limit);
+ }
+ if (offset != null && offset > 0) {
+ query.append(" OFFSET ");
+ query.append(offset);
+ }
+
+ return query.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
new file mode 100644
index 0000000..ffe54dc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+
+/**
+ * A DatabaseAdapter that generates Oracle-compliant SQL.
+ */
+public class OracleDatabaseAdapter implements DatabaseAdapter {
+ @Override
+ public String getName() {
+ return "Oracle";
+ }
+
+ @Override
+ public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset) {
+ if (StringUtils.isEmpty(tableName)) {
+ throw new IllegalArgumentException("Table name cannot be null or empty");
+ }
+
+ final StringBuilder query = new StringBuilder();
+ boolean nestedSelect = (limit != null || offset != null);
+ if (nestedSelect) {
+ // Need a nested SELECT query here in order to use ROWNUM to limit the results
+ query.append("SELECT ");
+ if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) {
+ query.append("*");
+ } else {
+ query.append(columnNames);
+ }
+ query.append(" FROM (SELECT a.*, ROWNUM rnum FROM (");
+ }
+
+ query.append("SELECT ");
+ if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) {
+ query.append("*");
+ } else {
+ query.append(columnNames);
+ }
+ query.append(" FROM ");
+ query.append(tableName);
+
+ if (!StringUtils.isEmpty(whereClause)) {
+ query.append(" WHERE ");
+ query.append(whereClause);
+ }
+ if (!StringUtils.isEmpty(orderByClause)) {
+ query.append(" ORDER BY ");
+ query.append(whereClause);
+ }
+ if (nestedSelect) {
+ query.append(") a");
+ int offsetVal = 0;
+ if (offset != null) {
+ offsetVal = offset;
+ }
+ if (limit != null) {
+ query.append(" WHERE ROWNUM <= ");
+ query.append(offsetVal + limit);
+ }
+ query.append(") WHERE rnum > ");
+ query.append(offsetVal);
+ }
+ return query.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 6ed7f5b..2ff466e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -43,6 +43,7 @@ org.apache.nifi.processors.standard.HashContent
org.apache.nifi.processors.standard.IdentifyMimeType
org.apache.nifi.processors.standard.InvokeHTTP
org.apache.nifi.processors.standard.JoltTransformJSON
+org.apache.nifi.processors.standard.GenerateTableFetch
org.apache.nifi.processors.standard.GetJMSQueue
org.apache.nifi.processors.standard.GetJMSTopic
org.apache.nifi.processors.standard.ListFile
http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
new file mode 100644
index 0000000..0e3685a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter
+org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index f932e4d..5aec899 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -26,11 +26,16 @@ import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
import org.fusesource.hawtbuf.ByteArrayInputStream;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -41,6 +46,7 @@ import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Arrays;
@@ -58,24 +64,52 @@ public class QueryDatabaseTableTest {
MockQueryDatabaseTable processor;
private TestRunner runner;
- final static String DB_LOCATION = "target/db";
+ private final static String DB_LOCATION = "target/db_qdt";
+ private DatabaseAdapter dbAdapter;
@BeforeClass
- public static void setupClass() {
+ public static void setupBeforeClass() throws IOException {
System.setProperty("derby.stream.error.file", "target/derby.log");
+
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ try {
+ FileUtils.deleteFile(dbLocation, true);
+ } catch (IOException ioe) {
+ // Do nothing, may not have existed
+ }
+ }
+
+ @AfterClass
+ public static void cleanUpAfterClass() throws Exception {
+ try {
+ DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
+ } catch (SQLNonTransientConnectionException e) {
+ // Do nothing, this is what happens at Derby shutdown
+ }
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ try {
+ FileUtils.deleteFile(dbLocation, true);
+ } catch (IOException ioe) {
+ // Do nothing, may not have existed
+ }
}
+
@Before
public void setup() throws InitializationException, IOException {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
+ dbAdapter = new GenericDatabaseAdapter();
processor = new MockQueryDatabaseTable();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(QueryDatabaseTable.DBCP_SERVICE, "dbcp");
+ runner.setProperty(QueryDatabaseTable.DB_TYPE, dbAdapter.getName());
runner.getStateManager().clear(Scope.CLUSTER);
}
@@ -85,20 +119,13 @@ public class QueryDatabaseTableTest {
}
@Test
- public void testGetColumns() throws Exception {
- assertTrue(processor.getColumns(null).isEmpty());
- assertTrue(processor.getColumns("").isEmpty());
- assertEquals(2, processor.getColumns("col1,col2").size());
- }
-
- @Test
public void testGetQuery() throws Exception {
- String query = processor.getQuery("myTable", null, null, null, "None");
+ String query = processor.getQuery(dbAdapter, "myTable", null, null, null);
assertEquals("SELECT * FROM myTable", query);
- query = processor.getQuery("myTable", "col1,col2", null, null, "None");
+ query = processor.getQuery(dbAdapter, "myTable", "col1,col2", null, null);
assertEquals("SELECT col1,col2 FROM myTable", query);
- query = processor.getQuery("myTable", null, Collections.singletonList("id"), null, "None");
+ query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null);
assertEquals("SELECT * FROM myTable", query);
Map<String, String> maxValues = new HashMap<>();
@@ -106,31 +133,29 @@ public class QueryDatabaseTableTest {
StateManager stateManager = runner.getStateManager();
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("id", Types.INTEGER);
- query = processor.getQuery("myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER), "None");
+ query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER));
assertEquals("SELECT * FROM myTable WHERE id > 509", query);
maxValues.put("date_created", "2016-03-07 12:34:56");
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("date_created", Types.TIMESTAMP);
- query = processor.getQuery("myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER), "None");
+ query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER));
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > '2016-03-07 12:34:56'", query);
+
// Test Oracle strategy
- query = processor.getQuery("myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER), "Oracle");
+ dbAdapter = new OracleDatabaseAdapter();
+ query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER));
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query);
}
@Test(expected = IllegalArgumentException.class)
public void testGetQueryNoTable() throws Exception {
- processor.getQuery(null, null, null, null, "None");
+ processor.getQuery(dbAdapter, null, null, null, null);
}
@Test
public void testAddedRows() throws ClassNotFoundException, SQLException, InitializationException, IOException {
- // remove previous test database, if any
- final File dbLocation = new File(DB_LOCATION);
- dbLocation.delete();
-
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
@@ -256,10 +281,6 @@ public class QueryDatabaseTableTest {
@Test
public void testWithNullIntColumn() throws SQLException {
- // remove previous test database, if any
- final File dbLocation = new File(DB_LOCATION);
- dbLocation.delete();
-
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
@@ -285,10 +306,6 @@ public class QueryDatabaseTableTest {
@Test
public void testWithSqlException() throws SQLException {
- // remove previous test database, if any
- final File dbLocation = new File(DB_LOCATION);
- dbLocation.delete();
-
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
@@ -330,7 +347,7 @@ public class QueryDatabaseTableTest {
/**
* Simple implementation only for QueryDatabaseTable processor testing.
*/
- class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+ private class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
@Override
public String getIdentifier() {
@@ -350,7 +367,7 @@ public class QueryDatabaseTableTest {
@Stateful(scopes = Scope.CLUSTER, description = "Mock for QueryDatabaseTable processor")
private static class MockQueryDatabaseTable extends QueryDatabaseTable {
- public void putColumnType(String colName, Integer colType) {
+ void putColumnType(String colName, Integer colType) {
columnTypeMap.put(colName, colType);
}
}