You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/07/17 22:06:54 UTC

[2/2] nifi git commit: NIFI-2157: Add GenerateTableFetch processor

NIFI-2157: Add GenerateTableFetch processor

This closes #645

Signed-off-by: jpercivall <jo...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/01cae237
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/01cae237
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/01cae237

Branch: refs/heads/master
Commit: 01cae237454c0d7ffedbe7dd08dbe705bc22cb09
Parents: ad02b43
Author: Matt Burgess <ma...@apache.org>
Authored: Wed Jul 13 21:29:51 2016 -0400
Committer: jpercivall <jo...@yahoo.com>
Committed: Sun Jul 17 17:54:42 2016 -0400

----------------------------------------------------------------------
 .../AbstractDatabaseFetchProcessor.java         | 362 ++++++++++++++++++
 .../processors/standard/GenerateTableFetch.java | 256 +++++++++++++
 .../processors/standard/QueryDatabaseTable.java | 382 ++-----------------
 .../processors/standard/db/DatabaseAdapter.java |  38 ++
 .../db/impl/GenericDatabaseAdapter.java         |  64 ++++
 .../standard/db/impl/OracleDatabaseAdapter.java |  82 ++++
 .../org.apache.nifi.processor.Processor         |   1 +
 ....nifi.processors.standard.db.DatabaseAdapter |  16 +
 .../standard/QueryDatabaseTableTest.java        |  77 ++--
 .../standard/TestGenerateTableFetch.java        | 224 +++++++++++
 .../standard/db/impl/DerbyDatabaseAdapter.java  |  68 ++++
 ....nifi.processors.standard.db.DatabaseAdapter |  15 +
 12 files changed, 1204 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
new file mode 100644
index 0000000..6182d93
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.text.DecimalFormat;
+import java.text.ParseException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+
+import static java.sql.Types.ARRAY;
+import static java.sql.Types.BIGINT;
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BIT;
+import static java.sql.Types.BLOB;
+import static java.sql.Types.BOOLEAN;
+import static java.sql.Types.CLOB;
+import static java.sql.Types.DECIMAL;
+import static java.sql.Types.DOUBLE;
+import static java.sql.Types.FLOAT;
+import static java.sql.Types.INTEGER;
+import static java.sql.Types.LONGVARBINARY;
+import static java.sql.Types.NUMERIC;
+import static java.sql.Types.CHAR;
+import static java.sql.Types.DATE;
+import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NCHAR;
+import static java.sql.Types.NVARCHAR;
+import static java.sql.Types.REAL;
+import static java.sql.Types.ROWID;
+import static java.sql.Types.SMALLINT;
+import static java.sql.Types.TIME;
+import static java.sql.Types.TIMESTAMP;
+import static java.sql.Types.TINYINT;
+import static java.sql.Types.VARBINARY;
+import static java.sql.Types.VARCHAR;
+
+/**
+ * A base class for common code shared by processors that fetch RDBMS data.
+ */
+public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFactoryProcessor {
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Successfully created FlowFile from SQL query result set.")
+            .build();
+
+    protected Set<Relationship> relationships;
+
+    // Properties
+    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
+            .name("Database Connection Pooling Service")
+            .description("The Controller Service that is used to obtain a connection to the database.")
+            .required(true)
+            .identifiesControllerService(DBCPService.class)
+            .build();
+
+    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("Table Name")
+            .description("The name of the database table to be queried.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor COLUMN_NAMES = new PropertyDescriptor.Builder()
+            .name("Columns to Return")
+            .description("A comma-separated list of column names to be used in the query. If your database requires "
+                    + "special treatment of the names (quoting, e.g.), each name should include such treatment. If no "
+                    + "column names are supplied, all columns in the specified table will be returned.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new PropertyDescriptor.Builder()
+            .name("Maximum-value Columns")
+            .description("A comma-separated list of column names. The processor will keep track of the maximum value "
+                    + "for each column that has been returned since the processor started running. This can be used to "
+                    + "retrieve only those rows that have been added/updated since the last retrieval. Note that some "
+                    + "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these "
+                    + "types should not be listed in this property, and will result in error(s) during processing. If no columns "
+                    + "are provided, all rows from the table will be considered, which could have a performance impact.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Max Wait Time")
+            .description("The maximum amount of time allowed for a running SQL select query "
+                    + ", zero means there is no limit. Max time less than 1 second will be equal to zero.")
+            .defaultValue("0 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    protected List<PropertyDescriptor> propDescriptors;
+
+    public static final PropertyDescriptor DB_TYPE;
+
+    protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>();
+    protected final Map<String, Integer> columnTypeMap = new HashMap<>();
+
+    static {
+        // Load the DatabaseAdapters
+        ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
+        dbAdapterLoader.forEach(it -> dbAdapters.put(it.getName(), it));
+
+        DB_TYPE = new PropertyDescriptor.Builder()
+                .name("db-fetch-db-type")
+                .displayName("Database Type")
+                .description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
+                        + "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
+                .allowableValues(dbAdapters.keySet())
+                .defaultValue(dbAdapters.values().stream().findFirst().get().getName())
+                .required(true)
+                .build();
+    }
+
+    public void setup(final ProcessContext context) {
+        // Try to fill the columnTypeMap with the types of the desired max-value columns
+        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        final String tableName = context.getProperty(TABLE_NAME).getValue();
+        final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
+        final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+
+        try (final Connection con = dbcpService.getConnection();
+             final Statement st = con.createStatement()) {
+
+            // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible
+            // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read
+            // approach as in Apache Drill
+            String query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null);
+            ResultSet resultSet = st.executeQuery(query);
+            ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+            int numCols = resultSetMetaData.getColumnCount();
+            if (numCols > 0) {
+                columnTypeMap.clear();
+                for (int i = 1; i <= numCols; i++) {
+                    String colName = resultSetMetaData.getColumnName(i).toLowerCase();
+                    int colType = resultSetMetaData.getColumnType(i);
+                    columnTypeMap.put(colName, colType);
+                }
+
+            } else {
+                throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames);
+            }
+
+        } catch (SQLException e) {
+            throw new ProcessException("Unable to communicate with database in order to determine column types", e);
+        }
+    }
+
+    protected static String getMaxValueFromRow(ResultSet resultSet,
+                                               int columnIndex,
+                                               Integer type,
+                                               String maxValueString,
+                                               String databaseType)
+            throws ParseException, IOException, SQLException {
+
+        // Skip any columns we're not keeping track of or whose value is null
+        if (type == null || resultSet.getObject(columnIndex) == null) {
+            return null;
+        }
+
+        switch (type) {
+            case CHAR:
+            case LONGNVARCHAR:
+            case LONGVARCHAR:
+            case NCHAR:
+            case NVARCHAR:
+            case VARCHAR:
+            case ROWID:
+                String colStringValue = resultSet.getString(columnIndex);
+                if (maxValueString == null || colStringValue.compareTo(maxValueString) > 0) {
+                    return colStringValue;
+                }
+                break;
+
+            case INTEGER:
+            case SMALLINT:
+            case TINYINT:
+                Integer colIntValue = resultSet.getInt(columnIndex);
+                Integer maxIntValue = null;
+                if (maxValueString != null) {
+                    maxIntValue = Integer.valueOf(maxValueString);
+                }
+                if (maxIntValue == null || colIntValue > maxIntValue) {
+                    return colIntValue.toString();
+                }
+                break;
+
+            case BIGINT:
+                Long colLongValue = resultSet.getLong(columnIndex);
+                Long maxLongValue = null;
+                if (maxValueString != null) {
+                    maxLongValue = Long.valueOf(maxValueString);
+                }
+                if (maxLongValue == null || colLongValue > maxLongValue) {
+                    return colLongValue.toString();
+                }
+                break;
+
+            case FLOAT:
+            case REAL:
+            case DOUBLE:
+                Double colDoubleValue = resultSet.getDouble(columnIndex);
+                Double maxDoubleValue = null;
+                if (maxValueString != null) {
+                    maxDoubleValue = Double.valueOf(maxValueString);
+                }
+                if (maxDoubleValue == null || colDoubleValue > maxDoubleValue) {
+                    return colDoubleValue.toString();
+                }
+                break;
+
+            case DECIMAL:
+            case NUMERIC:
+                BigDecimal colBigDecimalValue = resultSet.getBigDecimal(columnIndex);
+                BigDecimal maxBigDecimalValue = null;
+                if (maxValueString != null) {
+                    DecimalFormat df = new DecimalFormat();
+                    df.setParseBigDecimal(true);
+                    maxBigDecimalValue = (BigDecimal) df.parse(maxValueString);
+                }
+                if (maxBigDecimalValue == null || colBigDecimalValue.compareTo(maxBigDecimalValue) > 0) {
+                    return colBigDecimalValue.toString();
+                }
+                break;
+
+            case DATE:
+                Date rawColDateValue = resultSet.getDate(columnIndex);
+                java.sql.Date colDateValue = new java.sql.Date(rawColDateValue.getTime());
+                java.sql.Date maxDateValue = null;
+                if (maxValueString != null) {
+                    maxDateValue = java.sql.Date.valueOf(maxValueString);
+                }
+                if (maxDateValue == null || colDateValue.after(maxDateValue)) {
+                    return colDateValue.toString();
+                }
+                break;
+
+            case TIME:
+                Date rawColTimeValue = resultSet.getDate(columnIndex);
+                java.sql.Time colTimeValue = new java.sql.Time(rawColTimeValue.getTime());
+                java.sql.Time maxTimeValue = null;
+                if (maxValueString != null) {
+                    maxTimeValue = java.sql.Time.valueOf(maxValueString);
+                }
+                if (maxTimeValue == null || colTimeValue.after(maxTimeValue)) {
+                    return colTimeValue.toString();
+                }
+                break;
+
+            case TIMESTAMP:
+                // Oracle timestamp queries must use literals in java.sql.Date format
+                if ("Oracle".equals(databaseType)) {
+                    Date rawColOracleTimestampValue = resultSet.getDate(columnIndex);
+                    java.sql.Date oracleTimestampValue = new java.sql.Date(rawColOracleTimestampValue.getTime());
+                    java.sql.Date maxOracleTimestampValue = null;
+                    if (maxValueString != null) {
+                        maxOracleTimestampValue = java.sql.Date.valueOf(maxValueString);
+                    }
+                    if (maxOracleTimestampValue == null || oracleTimestampValue.after(maxOracleTimestampValue)) {
+                        return oracleTimestampValue.toString();
+                    }
+                } else {
+                    Timestamp rawColTimestampValue = resultSet.getTimestamp(columnIndex);
+                    java.sql.Timestamp colTimestampValue = new java.sql.Timestamp(rawColTimestampValue.getTime());
+                    java.sql.Timestamp maxTimestampValue = null;
+                    if (maxValueString != null) {
+                        maxTimestampValue = java.sql.Timestamp.valueOf(maxValueString);
+                    }
+                    if (maxTimestampValue == null || colTimestampValue.after(maxTimestampValue)) {
+                        return colTimestampValue.toString();
+                    }
+                }
+                break;
+
+            case BIT:
+            case BOOLEAN:
+            case BINARY:
+            case VARBINARY:
+            case LONGVARBINARY:
+            case ARRAY:
+            case BLOB:
+            case CLOB:
+            default:
+                throw new IOException("Type for column " + columnIndex + " is not valid for maintaining maximum value");
+        }
+        return null;
+    }
+
+    /**
+     * Returns a SQL literal for the given value based on its type. For example, values of character type need to be enclosed
+     * in single quotes, whereas values of numeric type should not be.
+     *
+     * @param type  The JDBC type for the desired literal
+     * @param value The value to be converted to a SQL literal
+     * @return A String representing the given value as a literal of the given type
+     */
+    protected static String getLiteralByType(int type, String value, String databaseType) {
+        // Format value based on column type. For example, strings and timestamps need to be quoted
+        switch (type) {
+            // For string-represented values, put in single quotes
+            case CHAR:
+            case LONGNVARCHAR:
+            case LONGVARCHAR:
+            case NCHAR:
+            case NVARCHAR:
+            case VARCHAR:
+            case ROWID:
+            case DATE:
+            case TIME:
+                return "'" + value + "'";
+            case TIMESTAMP:
+                // Timestamp literals in Oracle need to be cast with TO_DATE
+                if ("Oracle".equals(databaseType)) {
+                    return "to_date('" + value + "', 'yyyy-mm-dd HH24:MI:SS')";
+                } else {
+                    return "'" + value + "'";
+                }
+                // Else leave as is (numeric types, e.g.)
+            default:
+                return value;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
new file mode 100644
index 0000000..23a2e3c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+
+@TriggerSerially
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"sql", "select", "jdbc", "query", "database", "fetch", "generate"})
+@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class})
+@CapabilityDescription("Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, "
+        + "determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, "
+        + "which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This "
+        + "processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for "
+        + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor "
+        + "to fetch only those records that have max values greater than the retained values. This can be used for "
+        + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+        + "per the State Management documentation")
+public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
+
+    public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
+            .name("gen-table-fetch-partition-size")
+            .displayName("Partition Size")
+            .description("The number of result rows to be fetched by each generated SQL statement. The total number of rows in "
+                    + "the table divided by the partition size gives the number of SQL statements (i.e. FlowFiles) generated. A "
+                    + "value of zero indicates that a single FlowFile is to be generated whose SQL statement will fetch all rows "
+                    + "in the table.")
+            .defaultValue("10000")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public GenerateTableFetch() {
+        final Set<Relationship> r = new HashSet<>();
+        r.add(REL_SUCCESS);
+        relationships = Collections.unmodifiableSet(r);
+
+        final List<PropertyDescriptor> pds = new ArrayList<>();
+        pds.add(DBCP_SERVICE);
+        pds.add(DB_TYPE);
+        pds.add(TABLE_NAME);
+        pds.add(COLUMN_NAMES);
+        pds.add(MAX_VALUE_COLUMN_NAMES);
+        pds.add(QUERY_TIMEOUT);
+        pds.add(PARTITION_SIZE);
+        propDescriptors = Collections.unmodifiableList(pds);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propDescriptors;
+    }
+
+    @OnScheduled
+    public void setup(final ProcessContext context) {
+        super.setup(context);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+        ProcessSession session = sessionFactory.createSession();
+        final ComponentLog logger = getLogger();
+
+        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+        final String tableName = context.getProperty(TABLE_NAME).getValue();
+        final String columnNames = context.getProperty(COLUMN_NAMES).getValue();
+        final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
+        final int partitionSize = context.getProperty(PARTITION_SIZE).asInteger();
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            logger.error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
+                    + "query until this is accomplished.", ioe);
+            context.yield();
+            return;
+        }
+        try {
+            // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
+            // set as the current state map (after the session has been committed)
+            final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
+
+            // Build a WHERE clause with maximum-value columns (if they exist), and a list of column names that will contain MAX(<column>) aliases. The
+            // executed SQL query will retrieve the count of all records after the filter(s) have been applied, as well as the new maximum values for the
+            // specified columns. This allows the processor to generate the correctly partitioned SQL statements as well as to update the state with the
+            // latest observed maximum values.
+            String whereClause = null;
+            List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
+                    ? new ArrayList<>(0)
+                    : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
+            List<String> maxValueClauses = new ArrayList<>(maxValueColumnNameList.size());
+
+            String columnsClause = null;
+            List<String> maxValueSelectColumns = new ArrayList<>(maxValueColumnNameList.size() + 1);
+            maxValueSelectColumns.add("COUNT(*)");
+
+            // For each maximum-value column, get a WHERE filter and a MAX(column) alias
+            maxValueColumnNameList.forEach(colName -> {
+                maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
+                String maxValue = statePropertyMap.get(colName.toLowerCase());
+                if (!StringUtils.isEmpty(maxValue)) {
+                    Integer type = columnTypeMap.get(colName.toLowerCase());
+                    if (type == null) {
+                        // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled.
+                        throw new IllegalArgumentException("No column type found for: " + colName);
+                    }
+                    // Add a condition for the WHERE clause
+                    maxValueClauses.add(colName + " > " + getLiteralByType(type, maxValue, dbAdapter.getName()));
+                }
+            });
+
+            whereClause = StringUtils.join(maxValueClauses, " AND ");
+            columnsClause = StringUtils.join(maxValueSelectColumns, ", ");
+
+            // Build a SELECT query with maximum-value columns (if present)
+            final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null);
+            int rowCount = 0;
+
+            try (final Connection con = dbcpService.getConnection();
+                 final Statement st = con.createStatement()) {
+
+                final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+                st.setQueryTimeout(queryTimeout); // timeout in seconds
+
+                logger.debug("Executing {}", new Object[]{selectQuery});
+                ResultSet resultSet;
+
+                resultSet = st.executeQuery(selectQuery);
+
+                if (resultSet.next()) {
+                    // Total row count is in the first column
+                    rowCount = resultSet.getInt(1);
+
+                    // Update the state map with the newly-observed maximum values
+                    ResultSetMetaData rsmd = resultSet.getMetaData();
+                    for (int i = 2; i <= rsmd.getColumnCount(); i++) {
+                        String resultColumnName = rsmd.getColumnName(i).toLowerCase();
+                        int type = rsmd.getColumnType(i);
+                        try {
+                            String newMaxValue = getMaxValueFromRow(resultSet, i, type, statePropertyMap.get(resultColumnName.toLowerCase()), dbAdapter.getName());
+                            if (newMaxValue != null) {
+                                statePropertyMap.put(resultColumnName, newMaxValue);
+                            }
+                        } catch (ParseException | IOException pie) {
+                            // Fail the whole thing here before we start creating flow files and such
+                            throw new ProcessException(pie);
+                        }
+                    }
+                } else {
+                    // Something is very wrong here, one row (even if count is zero) should be returned
+                    throw new SQLException("No rows returned from metadata query: " + selectQuery);
+                }
+            } catch (SQLException e) {
+                logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
+                throw new ProcessException(e);
+            }
+            final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
+
+
+            // Generate SQL statements to read "pages" of data
+            for (int i = 0; i < numberOfFetches; i++) {
+                FlowFile sqlFlowFile;
+
+                Integer limit = partitionSize == 0 ? null : partitionSize;
+                Integer offset = partitionSize == 0 ? null : i * partitionSize;
+                final String query = dbAdapter.getSelectStatement(tableName, columnNames, StringUtils.join(maxValueClauses, " AND "), null, limit, offset);
+                sqlFlowFile = session.create();
+                sqlFlowFile = session.write(sqlFlowFile, out -> {
+                    out.write(query.getBytes());
+                });
+                session.transfer(sqlFlowFile, REL_SUCCESS);
+            }
+
+            session.commit();
+            try {
+                // Update the state
+                stateManager.setState(statePropertyMap, Scope.CLUSTER);
+            } catch (IOException ioe) {
+                logger.error("{} failed to update State Manager, observed maximum values will not be recorded. "
+                                + "Also, any generated SQL statements may be duplicated.",
+                        new Object[]{this, ioe});
+            }
+        } catch (final ProcessException pe) {
+            // Log the cause of the ProcessException if it is available
+            Throwable t = (pe.getCause() == null ? pe : pe.getCause());
+            logger.error("Error during processing: {}", new Object[]{t.getMessage()}, t);
+            session.rollback();
+            context.yield();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index e749696..ea24dd0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -32,31 +32,26 @@ import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.util.StopWatch;
 
 import java.io.IOException;
-import java.io.OutputStream;
-import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
-import java.sql.Date;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.sql.Timestamp;
-import java.text.DecimalFormat;
 import java.text.ParseException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -64,36 +59,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
-import static java.sql.Types.ARRAY;
-import static java.sql.Types.BIGINT;
-import static java.sql.Types.BINARY;
-import static java.sql.Types.BIT;
-import static java.sql.Types.BLOB;
-import static java.sql.Types.BOOLEAN;
-import static java.sql.Types.CHAR;
-import static java.sql.Types.CLOB;
-import static java.sql.Types.DATE;
-import static java.sql.Types.DECIMAL;
-import static java.sql.Types.DOUBLE;
-import static java.sql.Types.FLOAT;
-import static java.sql.Types.INTEGER;
-import static java.sql.Types.LONGNVARCHAR;
-import static java.sql.Types.LONGVARBINARY;
-import static java.sql.Types.LONGVARCHAR;
-import static java.sql.Types.NCHAR;
-import static java.sql.Types.NUMERIC;
-import static java.sql.Types.NVARCHAR;
-import static java.sql.Types.REAL;
-import static java.sql.Types.ROWID;
-import static java.sql.Types.SMALLINT;
-import static java.sql.Types.TIME;
-import static java.sql.Types.TIMESTAMP;
-import static java.sql.Types.TINYINT;
-import static java.sql.Types.VARBINARY;
-import static java.sql.Types.VARCHAR;
 import java.util.concurrent.atomic.AtomicLong;
 
+
 @EventDriven
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"sql", "select", "jdbc", "query", "database"})
@@ -108,72 +76,10 @@ import java.util.concurrent.atomic.AtomicLong;
         + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
         + "per the State Management documentation")
 @WritesAttribute(attribute = "querydbtable.row.count")
-public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
+public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
 
     public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
 
-    public static final String SQL_PREPROCESS_STRATEGY_NONE = "None";
-    public static final String SQL_PREPROCESS_STRATEGY_ORACLE = "Oracle";
-
-    // Relationships
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Successfully created FlowFile from SQL query result set.")
-            .build();
-
-    private final Set<Relationship> relationships;
-
-    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
-            .name("Database Connection Pooling Service")
-            .description("The Controller Service that is used to obtain a connection to the database.")
-            .required(true)
-            .identifiesControllerService(DBCPService.class)
-            .build();
-
-    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
-            .name("Table Name")
-            .description("The name of the database table to be queried.")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor COLUMN_NAMES = new PropertyDescriptor.Builder()
-            .name("Columns to Return")
-            .description("A comma-separated list of column names to be used in the query. If your database requires "
-                    + "special treatment of the names (quoting, e.g.), each name should include such treatment. If no "
-                    + "column names are supplied, all columns in the specified table will be returned.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new PropertyDescriptor.Builder()
-            .name("Maximum-value Columns")
-            .description("A comma-separated list of column names. The processor will keep track of the maximum value "
-                    + "for each column that has been returned since the processor started running. This can be used to "
-                    + "retrieve only those rows that have been added/updated since the last retrieval. Note that some "
-                    + "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these "
-                    + "types should not be listed in this property, and will result in error(s) during processing.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
-            .name("Max Wait Time")
-            .description("The maximum amount of time allowed for a running SQL select query "
-                    + ", zero means there is no limit. Max time less than 1 second will be equal to zero.")
-            .defaultValue("0 seconds")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor SQL_PREPROCESS_STRATEGY = new PropertyDescriptor.Builder()
-            .name("SQL Pre-processing Strategy")
-            .description("The strategy to employ when generating the SQL for querying the table. A strategy may include "
-                    + "custom or database-specific code, such as the treatment of time/date formats.")
-            .required(true)
-            .allowableValues(SQL_PREPROCESS_STRATEGY_NONE, SQL_PREPROCESS_STRATEGY_ORACLE)
-            .defaultValue("None")
-            .build();
 
     public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
             .name("Fetch Size")
@@ -184,11 +90,6 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .build();
 
-
-    private final List<PropertyDescriptor> propDescriptors;
-
-    protected final Map<String, Integer> columnTypeMap = new HashMap<>();
-
     public QueryDatabaseTable() {
         final Set<Relationship> r = new HashSet<>();
         r.add(REL_SUCCESS);
@@ -196,11 +97,11 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
 
         final List<PropertyDescriptor> pds = new ArrayList<>();
         pds.add(DBCP_SERVICE);
+        pds.add(DB_TYPE);
         pds.add(TABLE_NAME);
         pds.add(COLUMN_NAMES);
         pds.add(MAX_VALUE_COLUMN_NAMES);
         pds.add(QUERY_TIMEOUT);
-        pds.add(SQL_PREPROCESS_STRATEGY);
         pds.add(FETCH_SIZE);
         propDescriptors = Collections.unmodifiableList(pds);
     }
@@ -217,36 +118,7 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
 
     @OnScheduled
     public void setup(final ProcessContext context) {
-        // Try to fill the columnTypeMap with the types of the desired max-value columns
-        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        final String tableName = context.getProperty(TABLE_NAME).getValue();
-        final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
-
-        try (final Connection con = dbcpService.getConnection();
-             final Statement st = con.createStatement()) {
-
-            // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible
-            // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read
-            // approach as in Apache Drill
-            String query = getSelectFromClause(tableName, maxValueColumnNames).append(" WHERE 1 = 0").toString();
-            ResultSet resultSet = st.executeQuery(query);
-            ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-            int numCols = resultSetMetaData.getColumnCount();
-            if (numCols > 0) {
-                columnTypeMap.clear();
-                for (int i = 1; i <= numCols; i++) {
-                    String colName = resultSetMetaData.getColumnName(i).toLowerCase();
-                    int colType = resultSetMetaData.getColumnType(i);
-                    columnTypeMap.put(colName, colType);
-                }
-
-            } else {
-                throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames);
-            }
-
-        } catch (SQLException e) {
-            throw new ProcessException("Unable to communicate with database in order to determine column types", e);
-        }
+        super.setup(context);
     }
 
     @Override
@@ -257,10 +129,10 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
         final ComponentLog logger = getLogger();
 
         final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
         final String tableName = context.getProperty(TABLE_NAME).getValue();
         final String columnNames = context.getProperty(COLUMN_NAMES).getValue();
         final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
-        final String preProcessStrategy = context.getProperty(SQL_PREPROCESS_STRATEGY).getValue();
         final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
 
         final StateManager stateManager = context.getStateManager();
@@ -278,7 +150,10 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
         // set as the current state map (after the session has been committed)
         final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
 
-        final String selectQuery = getQuery(tableName, columnNames, getColumns(maxValueColumnNames), stateMap, preProcessStrategy);
+        List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
+                ? null
+                : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
+        final String selectQuery = getQuery(dbAdapter, tableName, columnNames, maxValueColumnNameList, stateMap);
         final StopWatch stopWatch = new StopWatch(true);
 
         try (final Connection con = dbcpService.getConnection();
@@ -299,19 +174,16 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
             final AtomicLong nrOfRows = new AtomicLong(0L);
 
             fileToProcess = session.create();
-            fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException {
-                    try {
-                        logger.debug("Executing query {}", new Object[]{selectQuery});
-                        final ResultSet resultSet = st.executeQuery(selectQuery);
-                        // Max values will be updated in the state property map by the callback
-                        final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, preProcessStrategy);
-                        nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector));
-
-                    } catch (final SQLException e) {
-                        throw new ProcessException("Error during database query or conversion of records to Avro", e);
-                    }
+            fileToProcess = session.write(fileToProcess, out -> {
+                try {
+                    logger.debug("Executing query {}", new Object[]{selectQuery});
+                    final ResultSet resultSet = st.executeQuery(selectQuery);
+                    // Max values will be updated in the state property map by the callback
+                    final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter);
+                    nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector));
+
+                } catch (final SQLException e) {
+                    throw new ProcessException("Error during database query or conversion of records to Avro", e);
                 }
             });
 
@@ -355,29 +227,12 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
         }
     }
 
-    protected List<String> getColumns(String commaSeparatedColumnList) {
-        if (StringUtils.isEmpty(commaSeparatedColumnList)) {
-            return Collections.emptyList();
-        }
-        final String[] columns = commaSeparatedColumnList.split(",");
-        final List<String> columnList = new ArrayList<>(columns.length);
-        for (String column : columns) {
-            if (column != null) {
-                String trimmedColumn = column.trim();
-                if (!StringUtils.isEmpty(trimmedColumn)) {
-                    columnList.add(trimmedColumn);
-                }
-            }
-        }
-        return columnList;
-    }
-
-    protected String getQuery(String tableName, String columnNames, List<String> maxValColumnNames,
-                              StateMap stateMap, String preProcessStrategy) {
+    protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
+                              StateMap stateMap) {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name must be specified");
         }
-        final StringBuilder query = new StringBuilder(getSelectFromClause(tableName, columnNames));
+        final StringBuilder query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
 
         // Check state map for last max values
         if (stateMap != null && stateMap.getVersion() != -1 && maxValColumnNames != null) {
@@ -392,7 +247,7 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
                         throw new IllegalArgumentException("No column type found for: " + colName);
                     }
                     // Add a condition for the WHERE clause
-                    whereClauses.add(colName + " > " + getLiteralByType(type, maxValue, preProcessStrategy));
+                    whereClauses.add(colName + " > " + getLiteralByType(type, maxValue, dbAdapter.getName()));
                 }
             }
             if (!whereClauses.isEmpty()) {
@@ -404,67 +259,13 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
         return query.toString();
     }
 
-    /**
-     * Returns a basic SELECT ... FROM clause with the given column names and table name. If no column names are found,
-     * the wildcard (*) is used to select all columns.
-     *
-     * @param tableName   The name of the table to select from
-     * @param columnNames A comma-separated list of column names to select from the table
-     * @return A SQL select statement representing a query of the given column names from the given table
-     */
-    protected StringBuilder getSelectFromClause(String tableName, String columnNames) {
-        final StringBuilder query = new StringBuilder("SELECT ");
-        if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) {
-            query.append("*");
-        } else {
-            query.append(columnNames);
-        }
-        query.append(" FROM ");
-        query.append(tableName);
-        return query;
-    }
-
-    /**
-     * Returns a SQL literal for the given value based on its type. For example, values of character type need to be enclosed
-     * in single quotes, whereas values of numeric type should not be.
-     *
-     * @param type  The JDBC type for the desired literal
-     * @param value The value to be converted to a SQL literal
-     * @return A String representing the given value as a literal of the given type
-     */
-    protected String getLiteralByType(int type, String value, String preProcessStrategy) {
-        // Format value based on column type. For example, strings and timestamps need to be quoted
-        switch (type) {
-            // For string-represented values, put in single quotes
-            case CHAR:
-            case LONGNVARCHAR:
-            case LONGVARCHAR:
-            case NCHAR:
-            case NVARCHAR:
-            case VARCHAR:
-            case ROWID:
-            case DATE:
-            case TIME:
-                return "'" + value + "'";
-            case TIMESTAMP:
-                // Timestamp literals in Oracle need to be cast with TO_DATE
-                if (SQL_PREPROCESS_STRATEGY_ORACLE.equals(preProcessStrategy)) {
-                    return "to_date('" + value + "', 'yyyy-mm-dd HH24:MI:SS')";
-                } else {
-                    return "'" + value + "'";
-                }
-                // Else leave as is (numeric types, e.g.)
-            default:
-                return value;
-        }
-    }
 
     protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
-        String preProcessStrategy;
+        DatabaseAdapter dbAdapter;
         Map<String, String> newColMap;
 
-        public MaxValueResultSetRowCollector(Map<String, String> stateMap, String preProcessStrategy) {
-            this.preProcessStrategy = preProcessStrategy;
+        public MaxValueResultSetRowCollector(Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
+            this.dbAdapter = dbAdapter;
             newColMap = stateMap;
         }
 
@@ -486,130 +287,9 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
                             continue;
                         }
                         String maxValueString = newColMap.get(colName);
-                        switch (type) {
-                            case CHAR:
-                            case LONGNVARCHAR:
-                            case LONGVARCHAR:
-                            case NCHAR:
-                            case NVARCHAR:
-                            case VARCHAR:
-                            case ROWID:
-                                String colStringValue = resultSet.getString(i);
-                                if (maxValueString == null || colStringValue.compareTo(maxValueString) > 0) {
-                                    newColMap.put(colName, colStringValue);
-                                }
-                                break;
-
-                            case INTEGER:
-                            case SMALLINT:
-                            case TINYINT:
-                                Integer colIntValue = resultSet.getInt(i);
-                                Integer maxIntValue = null;
-                                if (maxValueString != null) {
-                                    maxIntValue = Integer.valueOf(maxValueString);
-                                }
-                                if (maxIntValue == null || colIntValue > maxIntValue) {
-                                    newColMap.put(colName, colIntValue.toString());
-                                }
-                                break;
-
-                            case BIGINT:
-                                Long colLongValue = resultSet.getLong(i);
-                                Long maxLongValue = null;
-                                if (maxValueString != null) {
-                                    maxLongValue = Long.valueOf(maxValueString);
-                                }
-                                if (maxLongValue == null || colLongValue > maxLongValue) {
-                                    newColMap.put(colName, colLongValue.toString());
-                                }
-                                break;
-
-                            case FLOAT:
-                            case REAL:
-                            case DOUBLE:
-                                Double colDoubleValue = resultSet.getDouble(i);
-                                Double maxDoubleValue = null;
-                                if (maxValueString != null) {
-                                    maxDoubleValue = Double.valueOf(maxValueString);
-                                }
-                                if (maxDoubleValue == null || colDoubleValue > maxDoubleValue) {
-                                    newColMap.put(colName, colDoubleValue.toString());
-                                }
-                                break;
-
-                            case DECIMAL:
-                            case NUMERIC:
-                                BigDecimal colBigDecimalValue = resultSet.getBigDecimal(i);
-                                BigDecimal maxBigDecimalValue = null;
-                                if (maxValueString != null) {
-                                    DecimalFormat df = new DecimalFormat();
-                                    df.setParseBigDecimal(true);
-                                    maxBigDecimalValue = (BigDecimal) df.parse(maxValueString);
-                                }
-                                if (maxBigDecimalValue == null || colBigDecimalValue.compareTo(maxBigDecimalValue) > 0) {
-                                    newColMap.put(colName, colBigDecimalValue.toString());
-                                }
-                                break;
-
-                            case DATE:
-                                Date rawColDateValue = resultSet.getDate(i);
-                                java.sql.Date colDateValue = new java.sql.Date(rawColDateValue.getTime());
-                                java.sql.Date maxDateValue = null;
-                                if (maxValueString != null) {
-                                    maxDateValue = java.sql.Date.valueOf(maxValueString);
-                                }
-                                if (maxDateValue == null || colDateValue.after(maxDateValue)) {
-                                    newColMap.put(colName, colDateValue.toString());
-                                }
-                                break;
-
-                            case TIME:
-                                Date rawColTimeValue = resultSet.getDate(i);
-                                java.sql.Time colTimeValue = new java.sql.Time(rawColTimeValue.getTime());
-                                java.sql.Time maxTimeValue = null;
-                                if (maxValueString != null) {
-                                    maxTimeValue = java.sql.Time.valueOf(maxValueString);
-                                }
-                                if (maxTimeValue == null || colTimeValue.after(maxTimeValue)) {
-                                    newColMap.put(colName, colTimeValue.toString());
-                                }
-                                break;
-
-                            case TIMESTAMP:
-                                // Oracle timestamp queries must use literals in java.sql.Date format
-                                if (SQL_PREPROCESS_STRATEGY_ORACLE.equals(preProcessStrategy)) {
-                                    Date rawColOracleTimestampValue = resultSet.getDate(i);
-                                    java.sql.Date oracleTimestampValue = new java.sql.Date(rawColOracleTimestampValue.getTime());
-                                    java.sql.Date maxOracleTimestampValue = null;
-                                    if (maxValueString != null) {
-                                        maxOracleTimestampValue = java.sql.Date.valueOf(maxValueString);
-                                    }
-                                    if (maxOracleTimestampValue == null || oracleTimestampValue.after(maxOracleTimestampValue)) {
-                                        newColMap.put(colName, oracleTimestampValue.toString());
-                                    }
-                                } else {
-                                    Timestamp rawColTimestampValue = resultSet.getTimestamp(i);
-                                    java.sql.Timestamp colTimestampValue = new java.sql.Timestamp(rawColTimestampValue.getTime());
-                                    java.sql.Timestamp maxTimestampValue = null;
-                                    if (maxValueString != null) {
-                                        maxTimestampValue = java.sql.Timestamp.valueOf(maxValueString);
-                                    }
-                                    if (maxTimestampValue == null || colTimestampValue.after(maxTimestampValue)) {
-                                        newColMap.put(colName, colTimestampValue.toString());
-                                    }
-                                }
-                                break;
-
-                            case BIT:
-                            case BOOLEAN:
-                            case BINARY:
-                            case VARBINARY:
-                            case LONGVARBINARY:
-                            case ARRAY:
-                            case BLOB:
-                            case CLOB:
-                            default:
-                                throw new IOException("Type " + meta.getColumnTypeName(i) + " is not valid for maintaining maximum value");
+                        String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName());
+                        if (newMaxValueString != null) {
+                            newColMap.put(colName, newMaxValueString);
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
new file mode 100644
index 0000000..21ab331
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db;
+
+/**
+ * Interface for RDBMS/JDBC-specific code.
+ */
+public interface DatabaseAdapter {
+
+    String getName();
+
+    /**
+     * Returns a SQL SELECT statement with the given clauses applied.
+     *
+     * @param tableName     The name of the table to fetch rows from
+     * @param columnNames   The names of the columns to fetch from the table
+     * @param whereClause   The filter to apply to the statement. This should not include the WHERE keyword
+     * @param orderByClause The columns/clause used for ordering the result rows. This should not include the ORDER BY keywords
+     * @param limit         The value for the LIMIT clause (i.e. the number of rows to return)
+     * @param offset        The value for the OFFSET clause (i.e. the number of rows to skip)
+     * @return A String containing a SQL SELECT statement with the given clauses applied
+     */
+    String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
new file mode 100644
index 0000000..5beae29
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+
+/**
+ * A generic database adapter that generates ANSI SQL.
+ */
+public class GenericDatabaseAdapter implements DatabaseAdapter {
+    @Override
+    public String getName() {
+        return "Generic";
+    }
+
+    @Override
+    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset) {
+        if (StringUtils.isEmpty(tableName)) {
+            throw new IllegalArgumentException("Table name cannot be null or empty");
+        }
+        final StringBuilder query = new StringBuilder("SELECT ");
+        if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) {
+            query.append("*");
+        } else {
+            query.append(columnNames);
+        }
+        query.append(" FROM ");
+        query.append(tableName);
+
+        if (!StringUtils.isEmpty(whereClause)) {
+            query.append(" WHERE ");
+            query.append(whereClause);
+        }
+        if (!StringUtils.isEmpty(orderByClause)) {
+            query.append(" ORDER BY ");
+            query.append(whereClause);
+        }
+        if (limit != null) {
+            query.append(" LIMIT ");
+            query.append(limit);
+        }
+        if (offset != null && offset > 0) {
+            query.append(" OFFSET ");
+            query.append(offset);
+        }
+
+        return query.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
new file mode 100644
index 0000000..ffe54dc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+
+/**
+ * A DatabaseAdapter that generates Oracle-compliant SQL.
+ */
+public class OracleDatabaseAdapter implements DatabaseAdapter {
+    @Override
+    public String getName() {
+        return "Oracle";
+    }
+
+    @Override
+    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset) {
+        if (StringUtils.isEmpty(tableName)) {
+            throw new IllegalArgumentException("Table name cannot be null or empty");
+        }
+
+        final StringBuilder query = new StringBuilder();
+        boolean nestedSelect = (limit != null || offset != null);
+        if (nestedSelect) {
+            // Need a nested SELECT query here in order to use ROWNUM to limit the results
+            query.append("SELECT ");
+            if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) {
+                query.append("*");
+            } else {
+                query.append(columnNames);
+            }
+            query.append(" FROM (SELECT a.*, ROWNUM rnum FROM (");
+        }
+
+        query.append("SELECT ");
+        if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) {
+            query.append("*");
+        } else {
+            query.append(columnNames);
+        }
+        query.append(" FROM ");
+        query.append(tableName);
+
+        if (!StringUtils.isEmpty(whereClause)) {
+            query.append(" WHERE ");
+            query.append(whereClause);
+        }
+        if (!StringUtils.isEmpty(orderByClause)) {
+            query.append(" ORDER BY ");
+            query.append(whereClause);
+        }
+        if (nestedSelect) {
+            query.append(") a");
+            int offsetVal = 0;
+            if (offset != null) {
+                offsetVal = offset;
+            }
+            if (limit != null) {
+                query.append(" WHERE ROWNUM <= ");
+                query.append(offsetVal + limit);
+            }
+            query.append(") WHERE rnum > ");
+            query.append(offsetVal);
+        }
+        return query.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 6ed7f5b..2ff466e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -43,6 +43,7 @@ org.apache.nifi.processors.standard.HashContent
 org.apache.nifi.processors.standard.IdentifyMimeType
 org.apache.nifi.processors.standard.InvokeHTTP
 org.apache.nifi.processors.standard.JoltTransformJSON
+org.apache.nifi.processors.standard.GenerateTableFetch
 org.apache.nifi.processors.standard.GetJMSQueue
 org.apache.nifi.processors.standard.GetJMSTopic
 org.apache.nifi.processors.standard.ListFile

http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
new file mode 100644
index 0000000..0e3685a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter
+org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/01cae237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index f932e4d..5aec899 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -26,11 +26,16 @@ import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
 import org.fusesource.hawtbuf.ByteArrayInputStream;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -41,6 +46,7 @@ import java.io.InputStream;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.Arrays;
@@ -58,24 +64,52 @@ public class QueryDatabaseTableTest {
 
     MockQueryDatabaseTable processor;
     private TestRunner runner;
-    final static String DB_LOCATION = "target/db";
+    private final static String DB_LOCATION = "target/db_qdt";
+    private DatabaseAdapter dbAdapter;
 
 
     @BeforeClass
-    public static void setupClass() {
+    public static void setupBeforeClass() throws IOException {
         System.setProperty("derby.stream.error.file", "target/derby.log");
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        try {
+            FileUtils.deleteFile(dbLocation, true);
+        } catch (IOException ioe) {
+            // Do nothing, may not have existed
+        }
+    }
+
+    @AfterClass
+    public static void cleanUpAfterClass() throws Exception {
+        try {
+            DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
+        } catch (SQLNonTransientConnectionException e) {
+            // Do nothing, this is what happens at Derby shutdown
+        }
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        try {
+            FileUtils.deleteFile(dbLocation, true);
+        } catch (IOException ioe) {
+            // Do nothing, may not have existed
+        }
     }
 
+
     @Before
     public void setup() throws InitializationException, IOException {
         final DBCPService dbcp = new DBCPServiceSimpleImpl();
         final Map<String, String> dbcpProperties = new HashMap<>();
+        dbAdapter = new GenericDatabaseAdapter();
 
         processor = new MockQueryDatabaseTable();
         runner = TestRunners.newTestRunner(processor);
         runner.addControllerService("dbcp", dbcp, dbcpProperties);
         runner.enableControllerService(dbcp);
         runner.setProperty(QueryDatabaseTable.DBCP_SERVICE, "dbcp");
+        runner.setProperty(QueryDatabaseTable.DB_TYPE, dbAdapter.getName());
         runner.getStateManager().clear(Scope.CLUSTER);
     }
 
@@ -85,20 +119,13 @@ public class QueryDatabaseTableTest {
     }
 
     @Test
-    public void testGetColumns() throws Exception {
-        assertTrue(processor.getColumns(null).isEmpty());
-        assertTrue(processor.getColumns("").isEmpty());
-        assertEquals(2, processor.getColumns("col1,col2").size());
-    }
-
-    @Test
     public void testGetQuery() throws Exception {
-        String query = processor.getQuery("myTable", null, null, null, "None");
+        String query = processor.getQuery(dbAdapter, "myTable", null, null, null);
         assertEquals("SELECT * FROM myTable", query);
-        query = processor.getQuery("myTable", "col1,col2", null, null, "None");
+        query = processor.getQuery(dbAdapter, "myTable", "col1,col2", null, null);
         assertEquals("SELECT col1,col2 FROM myTable", query);
 
-        query = processor.getQuery("myTable", null, Collections.singletonList("id"), null, "None");
+        query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null);
         assertEquals("SELECT * FROM myTable", query);
 
         Map<String, String> maxValues = new HashMap<>();
@@ -106,31 +133,29 @@ public class QueryDatabaseTableTest {
         StateManager stateManager = runner.getStateManager();
         stateManager.setState(maxValues, Scope.CLUSTER);
         processor.putColumnType("id", Types.INTEGER);
-        query = processor.getQuery("myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER), "None");
+        query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER));
         assertEquals("SELECT * FROM myTable WHERE id > 509", query);
 
         maxValues.put("date_created", "2016-03-07 12:34:56");
         stateManager.setState(maxValues, Scope.CLUSTER);
         processor.putColumnType("date_created", Types.TIMESTAMP);
-        query = processor.getQuery("myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER), "None");
+        query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER));
         assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > '2016-03-07 12:34:56'", query);
+
         // Test Oracle strategy
-        query = processor.getQuery("myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER), "Oracle");
+        dbAdapter = new OracleDatabaseAdapter();
+        query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER));
         assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testGetQueryNoTable() throws Exception {
-        processor.getQuery(null, null, null, null, "None");
+        processor.getQuery(dbAdapter, null, null, null, null);
     }
 
     @Test
     public void testAddedRows() throws ClassNotFoundException, SQLException, InitializationException, IOException {
 
-        // remove previous test database, if any
-        final File dbLocation = new File(DB_LOCATION);
-        dbLocation.delete();
-
         // load test data to database
         final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
         Statement stmt = con.createStatement();
@@ -256,10 +281,6 @@ public class QueryDatabaseTableTest {
 
     @Test
     public void testWithNullIntColumn() throws SQLException {
-        // remove previous test database, if any
-        final File dbLocation = new File(DB_LOCATION);
-        dbLocation.delete();
-
         // load test data to database
         final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
         Statement stmt = con.createStatement();
@@ -285,10 +306,6 @@ public class QueryDatabaseTableTest {
 
     @Test
     public void testWithSqlException() throws SQLException {
-        // remove previous test database, if any
-        final File dbLocation = new File(DB_LOCATION);
-        dbLocation.delete();
-
         // load test data to database
         final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
         Statement stmt = con.createStatement();
@@ -330,7 +347,7 @@ public class QueryDatabaseTableTest {
     /**
      * Simple implementation only for QueryDatabaseTable processor testing.
      */
-    class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+    private class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
 
         @Override
         public String getIdentifier() {
@@ -350,7 +367,7 @@ public class QueryDatabaseTableTest {
 
     @Stateful(scopes = Scope.CLUSTER, description = "Mock for QueryDatabaseTable processor")
     private static class MockQueryDatabaseTable extends QueryDatabaseTable {
-        public void putColumnType(String colName, Integer colType) {
+        void putColumnType(String colName, Integer colType) {
             columnTypeMap.put(colName, colType);
         }
     }