You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/06/08 07:36:37 UTC

nifi git commit: NIFI-5143: Initial work to support column values for paging results

Repository: nifi
Updated Branches:
  refs/heads/master 729f8aa24 -> 0e09b98b0


NIFI-5143: Initial work to support column values for paging results

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2728.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0e09b98b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0e09b98b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0e09b98b

Branch: refs/heads/master
Commit: 0e09b98b02bcfae21841cb7dda176c7c1c33f4d8
Parents: 729f8aa
Author: Matthew Burgess <ma...@apache.org>
Authored: Mon May 21 13:45:16 2018 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Fri Jun 8 09:36:27 2018 +0200

----------------------------------------------------------------------
 .../processors/standard/GenerateTableFetch.java | 100 +++++++++---
 .../processors/standard/db/DatabaseAdapter.java |  17 ++
 .../db/impl/GenericDatabaseAdapter.java         |  36 +++--
 .../db/impl/MSSQL2008DatabaseAdapter.java       |  27 +++-
 .../standard/db/impl/MSSQLDatabaseAdapter.java  |  43 ++++--
 .../db/impl/Oracle12DatabaseAdapter.java        |  39 +++--
 .../standard/db/impl/OracleDatabaseAdapter.java |  22 ++-
 .../db/impl/PhoenixDatabaseAdapter.java         |  40 +++--
 .../additionalDetails.html                      |  69 +++++++++
 .../standard/TestGenerateTableFetch.java        | 154 +++++++++++++++++--
 .../standard/db/impl/DerbyDatabaseAdapter.java  |  41 +++--
 .../db/impl/TestMSSQL2008DatabaseAdapter.java   |  24 +++
 .../db/impl/TestMSSQLDatabaseAdapter.java       |  24 +++
 .../db/impl/TestOracle12DatabaseAdapter.java    |  53 +++++--
 .../db/impl/TestOracleDatabaseAdapter.java      | 101 ++++++++++++
 15 files changed, 685 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index a2d01e9..9842d6c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -30,6 +30,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
@@ -98,8 +99,8 @@ import java.util.stream.IntStream;
         @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
 })
 @DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language",
-                 expressionLanguageScope = ExpressionLanguageScope.NONE, description = "Specifies an initial "
-                         + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
+        expressionLanguageScope = ExpressionLanguageScope.NONE, description = "Specifies an initial "
+        + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
 public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
 
     public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
@@ -115,6 +116,19 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor COLUMN_FOR_VALUE_PARTITIONING = new PropertyDescriptor.Builder()
+            .name("gen-table-column-for-val-partitioning")
+            .displayName("Column for Value Partitioning")
+            .description("The name of a column whose values will be used for partitioning. The default behavior is to use row numbers on the result set for partitioning into "
+                    + "'pages' to be fetched from the database, using an offset/limit strategy. However for certain databases, it can be more efficient under the right circumstances to use "
+                    + "the column values themselves to define the 'pages'. This property should only be used when the default queries are not performing well, when there is no maximum-value "
+                    + "column or a single maximum-value column whose type can be coerced to a long integer (i.e. not date or timestamp), and the column values are evenly distributed and not "
+                    + "sparse, for best performance.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     public static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("failure")
             .description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. "
@@ -135,6 +149,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
         pds.add(MAX_VALUE_COLUMN_NAMES);
         pds.add(QUERY_TIMEOUT);
         pds.add(PARTITION_SIZE);
+        pds.add(COLUMN_FOR_VALUE_PARTITIONING);
         pds.add(WHERE_CLAUSE);
         propDescriptors = Collections.unmodifiableList(pds);
     }
@@ -151,7 +166,15 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
-        return super.customValidate(validationContext);
+        List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+        final PropertyValue columnForPartitioning = validationContext.getProperty(COLUMN_FOR_VALUE_PARTITIONING);
+        // If no EL is present, ensure it's a single column (i.e. no commas in the property value)
+        if (columnForPartitioning.isSet() && !columnForPartitioning.isExpressionLanguagePresent() && columnForPartitioning.getValue().contains(",")) {
+            results.add(new ValidationResult.Builder().valid(false).explanation(
+                    COLUMN_FOR_VALUE_PARTITIONING.getDisplayName() + " requires a single column name, but a comma was detected").build());
+        }
+
+        return results;
     }
 
     @Override
@@ -195,6 +218,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
         final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
         final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
         final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
+        final String columnForPartitioning = context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue();
+        final boolean useColumnValsForPaging = !StringUtils.isEmpty(columnForPartitioning);
         final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
 
         final StateManager stateManager = context.getStateManager();
@@ -241,20 +266,24 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
             List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
                     ? new ArrayList<>(0)
                     : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
-            List<String> maxValueClauses = new ArrayList<>(maxValueColumnNameList.size());
+            final int numMaxValueColumns = maxValueColumnNameList.size();
+
+            List<String> maxValueClauses = new ArrayList<>(numMaxValueColumns);
+            Long maxValueForPartitioning = null;
+            Long minValueForPartitioning = null;
 
             String columnsClause = null;
-            List<String> maxValueSelectColumns = new ArrayList<>(maxValueColumnNameList.size() + 1);
+            List<String> maxValueSelectColumns = new ArrayList<>(numMaxValueColumns + 1);
             maxValueSelectColumns.add("COUNT(*)");
 
             // For each maximum-value column, get a WHERE filter and a MAX(column) alias
-            IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
+            IntStream.range(0, numMaxValueColumns).forEach((index) -> {
                 String colName = maxValueColumnNameList.get(index);
 
                 maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
                 String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName, dbAdapter);
                 if (!StringUtils.isEmpty(maxValue)) {
-                    if(columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null){
+                    if (columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null) {
                         // This means column type cache is clean after instance reboot. We should re-cache column type
                         super.setup(context, false, finalFileToProcess);
                     }
@@ -263,8 +292,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
                     // Add a condition for the WHERE clause
                     maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
                 }
+
             });
 
+            // If we are using a columns' values, get the maximum and minimum values in the context of the aforementioned WHERE clause
+            if (useColumnValsForPaging) {
+                if(columnForPartitioning.contains(",")) {
+                    throw new ProcessException(COLUMN_FOR_VALUE_PARTITIONING.getDisplayName() + " requires a single column name, but a comma was detected");
+                }
+                maxValueSelectColumns.add("MAX(" + columnForPartitioning + ") " + columnForPartitioning);
+                maxValueSelectColumns.add("MIN(" + columnForPartitioning + ") MIN_" + columnForPartitioning);
+            }
+
             if (customWhereClause != null) {
                 // adding the custom WHERE clause (if defined) to the list of existing clauses.
                 maxValueClauses.add("(" + customWhereClause + ")");
@@ -294,7 +333,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
 
                     // Update the state map with the newly-observed maximum values
                     ResultSetMetaData rsmd = resultSet.getMetaData();
-                    for (int i = 2; i <= rsmd.getColumnCount(); i++) {
+                    int i = 2;
+                    for (; i <= numMaxValueColumns + 1; i++) {
                         //Some JDBC drivers consider the columns name and label to be very different things.
                         // Since this column has been aliased lets check the label first,
                         // if there is no label we'll use the column name.
@@ -318,11 +358,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
                             if (newMaxValue != null) {
                                 statePropertyMap.put(fullyQualifiedStateKey, newMaxValue);
                             }
-                        } catch (ParseException | IOException pie) {
+                        } catch (ParseException | IOException | ClassCastException pice) {
                             // Fail the whole thing here before we start creating flow files and such
-                            throw new ProcessException(pie);
+                            throw new ProcessException(pice);
                         }
-
+                    }
+                    // Process the maximum and minimum values for the partitioning column if necessary
+                    // These are currently required to be Long values, will throw a ClassCastException if they are not
+                    if (useColumnValsForPaging) {
+                        Object o = resultSet.getObject(i);
+                        maxValueForPartitioning = o == null ? null : Long.valueOf(o.toString());
+                        o = resultSet.getObject(i + 1);
+                        minValueForPartitioning = o == null ? null : Long.valueOf(o.toString());
                     }
                 } else {
                     // Something is very wrong here, one row (even if count is zero) should be returned
@@ -330,13 +377,13 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
                 }
 
                 // for each maximum-value column get a right bounding WHERE condition
-                IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
+                IntStream.range(0, numMaxValueColumns).forEach((index) -> {
                     String colName = maxValueColumnNameList.get(index);
 
                     maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
                     String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName, dbAdapter);
                     if (!StringUtils.isEmpty(maxValue)) {
-                        if(columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null){
+                        if (columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null) {
                             // This means column type cache is clean after instance reboot. We should re-cache column type
                             super.setup(context, false, finalFileToProcess);
                         }
@@ -347,17 +394,30 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
                     }
                 });
 
-                //Update WHERE list to include new right hand boundaries
-                whereClause = StringUtils.join(maxValueClauses, " AND ");
-
-                final long numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
+                final long numberOfFetches;
+                if (useColumnValsForPaging) {
+                    final long valueRangeSize = maxValueForPartitioning == null ? 0 : (maxValueForPartitioning - minValueForPartitioning + 1);
+                    numberOfFetches = (partitionSize == 0) ? 1 : (valueRangeSize / partitionSize) + (valueRangeSize % partitionSize == 0 ? 0 : 1);
+                } else {
+                    numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
+                }
 
                 // Generate SQL statements to read "pages" of data
+                Long limit = partitionSize == 0 ? null : (long) partitionSize;
                 for (long i = 0; i < numberOfFetches; i++) {
-                    Long limit = partitionSize == 0 ? null : (long) partitionSize;
-                    Long offset = partitionSize == 0 ? null : i * partitionSize;
+                    // Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit)
+                    if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) {
+                        maxValueClauses.add(columnForPartitioning + " <= " + maxValueForPartitioning);
+                        limit = null;
+                    }
+
+                    //Update WHERE list to include new right hand boundaries
+                    whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND ");
+
+                    Long offset = partitionSize == 0 ? null : i * partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0);
+
                     final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", ");
-                    final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset);
+                    final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning);
                     FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
                     sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
                     sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
index 4e1ad47..211d4c8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
@@ -39,6 +39,23 @@ public interface DatabaseAdapter {
     String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset);
 
     /**
+     * Returns a SQL SELECT statement with the given clauses applied. Note that if this method is overridden, the other overloaded methods
+     * need to be overridden as well, to call this method with columnForPartitioning = false
+     *
+     * @param tableName             The name of the table to fetch rows from
+     * @param columnNames           The names of the columns to fetch from the table
+     * @param whereClause           The filter to apply to the statement. This should not include the WHERE keyword
+     * @param orderByClause         The columns/clause used for ordering the result rows. This should not include the ORDER BY keywords
+     * @param limit                 The value for the LIMIT clause (i.e. the number of rows to return)
+     * @param offset                The value for the OFFSET clause (i.e. the number of rows to skip)
+     * @param columnForPartitioning The (optional) column name that, if provided, the limit and offset values are based on values from the column itself (rather than the row number)
+     * @return A String containing a SQL SELECT statement with the given clauses applied
+     */
+    default String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
+        return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset);
+    }
+
+    /**
      * <p>Returns a bare identifier string by removing wrapping escape characters
      * from identifier strings such as table and column names.</p>
      * <p>The default implementation of this method removes double quotes.

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
index c663361..9efd9e0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
@@ -35,6 +35,11 @@ public class GenericDatabaseAdapter implements DatabaseAdapter {
 
     @Override
     public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+        return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+    }
+
+    @Override
+    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name cannot be null or empty");
         }
@@ -50,20 +55,33 @@ public class GenericDatabaseAdapter implements DatabaseAdapter {
         if (!StringUtils.isEmpty(whereClause)) {
             query.append(" WHERE ");
             query.append(whereClause);
+            if (!StringUtils.isEmpty(columnForPartitioning)) {
+                query.append(" AND ");
+                query.append(columnForPartitioning);
+                query.append(" >= ");
+                query.append(offset != null ? offset : "0");
+                if (limit != null) {
+                    query.append(" AND ");
+                    query.append(columnForPartitioning);
+                    query.append(" < ");
+                    query.append((offset == null ? 0 : offset) + limit);
+                }
+            }
         }
-        if (!StringUtils.isEmpty(orderByClause)) {
+        if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
             query.append(" ORDER BY ");
             query.append(orderByClause);
         }
-        if (limit != null) {
-            query.append(" LIMIT ");
-            query.append(limit);
+        if (StringUtils.isEmpty(columnForPartitioning)) {
+            if (limit != null) {
+                query.append(" LIMIT ");
+                query.append(limit);
+            }
+            if (offset != null && offset > 0) {
+                query.append(" OFFSET ");
+                query.append(offset);
+            }
         }
-        if (offset != null && offset > 0) {
-            query.append(" OFFSET ");
-            query.append(offset);
-        }
-
         return query.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java
index 3759cb4..610b34d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java
@@ -34,14 +34,19 @@ public class MSSQL2008DatabaseAdapter extends MSSQLDatabaseAdapter {
 
     @Override
     public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+        return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+    }
+
+    @Override
+    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name cannot be null or empty");
         }
 
         final StringBuilder query = new StringBuilder("SELECT ");
-
+        boolean useColumnForPartitioning = !StringUtils.isEmpty(columnForPartitioning);
         // If this is a limit query and not a paging query then use TOP in MS SQL
-        if (limit != null) {
+        if (limit != null && !useColumnForPartitioning) {
 
             if (offset != null) {
                 query.append("* FROM (SELECT ");
@@ -60,7 +65,7 @@ public class MSSQL2008DatabaseAdapter extends MSSQLDatabaseAdapter {
             query.append(columnNames);
         }
 
-        if (limit != null && offset != null && orderByClause != null) {
+        if (limit != null && offset != null && orderByClause != null && !useColumnForPartitioning) {
             query.append(", ROW_NUMBER() OVER(ORDER BY ");
             query.append(orderByClause);
             query.append(" asc) rnum");
@@ -71,14 +76,26 @@ public class MSSQL2008DatabaseAdapter extends MSSQLDatabaseAdapter {
         if (!StringUtils.isEmpty(whereClause)) {
             query.append(" WHERE ");
             query.append(whereClause);
+            if (useColumnForPartitioning) {
+                query.append(" AND ");
+                query.append(columnForPartitioning);
+                query.append(" >= ");
+                query.append(offset != null ? offset : "0");
+                if (limit != null) {
+                    query.append(" AND ");
+                    query.append(columnForPartitioning);
+                    query.append(" < ");
+                    query.append((offset == null ? 0 : offset) + limit);
+                }
+            }
         }
 
-        if (!StringUtils.isEmpty(orderByClause)) {
+        if (!StringUtils.isEmpty(orderByClause) && !useColumnForPartitioning) {
             query.append(" ORDER BY ");
             query.append(orderByClause);
         }
 
-        if (limit != null && offset != null) {
+        if (limit != null && offset != null && !useColumnForPartitioning) {
             query.append(") A WHERE rnum > ");
             query.append(offset);
             query.append(" AND rnum <= ");

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
index 3d23d9f..ec276fc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
@@ -35,13 +35,18 @@ public class MSSQLDatabaseAdapter implements DatabaseAdapter {
 
     @Override
     public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+        return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+    }
+
+    @Override
+    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name cannot be null or empty");
         }
         final StringBuilder query = new StringBuilder("SELECT ");
 
         //If this is a limit query and not a paging query then use TOP in MS SQL
-        if (limit != null && offset == null){
+        if (limit != null && offset == null && StringUtils.isEmpty(columnForPartitioning)){
             query.append("TOP ");
             query.append(limit);
             query.append(" ");
@@ -58,23 +63,37 @@ public class MSSQLDatabaseAdapter implements DatabaseAdapter {
         if (!StringUtils.isEmpty(whereClause)) {
             query.append(" WHERE ");
             query.append(whereClause);
+            if (!StringUtils.isEmpty(columnForPartitioning)) {
+                query.append(" AND ");
+                query.append(columnForPartitioning);
+                query.append(" >= ");
+                query.append(offset != null ? offset : "0");
+                if (limit != null) {
+                    query.append(" AND ");
+                    query.append(columnForPartitioning);
+                    query.append(" < ");
+                    query.append((offset == null ? 0 : offset) + limit);
+                }
+            }
         }
-        if (!StringUtils.isEmpty(orderByClause)) {
+        if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
             query.append(" ORDER BY ");
             query.append(orderByClause);
         }
-        if (offset != null && limit != null && limit > 0) {
-            if (StringUtils.isEmpty(orderByClause)) {
-                throw new IllegalArgumentException("Order by clause cannot be null or empty when using row paging");
-            }
+        if (StringUtils.isEmpty(columnForPartitioning)) {
+            if (offset != null && limit != null && limit > 0) {
+                if (StringUtils.isEmpty(orderByClause)) {
+                    throw new IllegalArgumentException("Order by clause cannot be null or empty when using row paging");
+                }
 
-            query.append(" OFFSET ");
-            query.append(offset);
-            query.append(" ROWS");
+                query.append(" OFFSET ");
+                query.append(offset);
+                query.append(" ROWS");
 
-            query.append(" FETCH NEXT ");
-            query.append(limit);
-            query.append(" ROWS ONLY");
+                query.append(" FETCH NEXT ");
+                query.append(limit);
+                query.append(" ROWS ONLY");
+            }
         }
 
         return query.toString();

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
index 77d92e1..c128ed1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
@@ -35,6 +35,11 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter {
 
     @Override
     public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+        return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+    }
+
+    @Override
+    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name cannot be null or empty");
         }
@@ -51,20 +56,34 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter {
         if (!StringUtils.isEmpty(whereClause)) {
             query.append(" WHERE ");
             query.append(whereClause);
+            if (!StringUtils.isEmpty(columnForPartitioning)) {
+                query.append(" AND ");
+                query.append(columnForPartitioning);
+                query.append(" >= ");
+                query.append(offset != null ? offset : "0");
+                if (limit != null) {
+                    query.append(" AND ");
+                    query.append(columnForPartitioning);
+                    query.append(" < ");
+                    query.append((offset == null ? 0 : offset) + limit);
+                }
+            }
         }
-        if (!StringUtils.isEmpty(orderByClause)) {
+        if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
             query.append(" ORDER BY ");
             query.append(orderByClause);
         }
-        if (offset != null && offset > 0) {
-            query.append(" OFFSET ");
-            query.append(offset);
-            query.append(" ROWS");
-        }
-        if (limit != null) {
-            query.append(" FETCH NEXT ");
-            query.append(limit);
-            query.append(" ROWS ONLY");
+        if (StringUtils.isEmpty(columnForPartitioning)) {
+            if (offset != null && offset > 0) {
+                query.append(" OFFSET ");
+                query.append(offset);
+                query.append(" ROWS");
+            }
+            if (limit != null) {
+                query.append(" FETCH NEXT ");
+                query.append(limit);
+                query.append(" ROWS ONLY");
+            }
         }
 
         return query.toString();

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
index 7ea57cc..5a1f099 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
@@ -35,12 +35,17 @@ public class OracleDatabaseAdapter implements DatabaseAdapter {
 
     @Override
     public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+        return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+    }
+
+    @Override
+    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name cannot be null or empty");
         }
 
         final StringBuilder query = new StringBuilder();
-        boolean nestedSelect = (limit != null || offset != null);
+        boolean nestedSelect = (limit != null || offset != null) && StringUtils.isEmpty(columnForPartitioning);
         if (nestedSelect) {
             // Need a nested SELECT query here in order to use ROWNUM to limit the results
             query.append("SELECT ");
@@ -64,8 +69,20 @@ public class OracleDatabaseAdapter implements DatabaseAdapter {
         if (!StringUtils.isEmpty(whereClause)) {
             query.append(" WHERE ");
             query.append(whereClause);
+            if (!StringUtils.isEmpty(columnForPartitioning)) {
+                query.append(" AND ");
+                query.append(columnForPartitioning);
+                query.append(" >= ");
+                query.append(offset != null ? offset : "0");
+                if (limit != null) {
+                    query.append(" AND ");
+                    query.append(columnForPartitioning);
+                    query.append(" < ");
+                    query.append((offset == null ? 0 : offset) + limit);
+                }
+            }
         }
-        if (!StringUtils.isEmpty(orderByClause)) {
+        if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
             query.append(" ORDER BY ");
             query.append(orderByClause);
         }
@@ -82,6 +99,7 @@ public class OracleDatabaseAdapter implements DatabaseAdapter {
             query.append(") WHERE rnum > ");
             query.append(offsetVal);
         }
+
         return query.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
index 92522db..a70d88a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
@@ -36,8 +36,12 @@ public final class PhoenixDatabaseAdapter implements DatabaseAdapter {
     }
 
     @Override
-    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause,
-            Long limit, Long offset) {
+    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+        return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+    }
+
+    @Override
+    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name cannot be null or empty");
         }
@@ -53,20 +57,34 @@ public final class PhoenixDatabaseAdapter implements DatabaseAdapter {
         if (!StringUtils.isEmpty(whereClause)) {
             query.append(" WHERE ");
             query.append(whereClause);
+            if (!StringUtils.isEmpty(columnForPartitioning)) {
+                query.append(" AND ");
+                query.append(columnForPartitioning);
+                query.append(" >= ");
+                query.append(offset != null ? offset : "0");
+                if (limit != null) {
+                    query.append(" AND ");
+                    query.append(columnForPartitioning);
+                    query.append(" < ");
+                    query.append((offset == null ? 0 : offset) + limit);
+                }
+            }
         }
-        if (!StringUtils.isEmpty(orderByClause)) {
+        if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
             query.append(" ORDER BY ");
             query.append(orderByClause);
         }
-        if (limit != null) {
-            query.append(" LIMIT ");
-            query.append(limit);
-        }
-        if (offset != null && offset > 0) {
-            query.append(" OFFSET ");
-            query.append(offset);
-        }
 
+        if (StringUtils.isEmpty(columnForPartitioning)) {
+            if (limit != null) {
+                query.append(" LIMIT ");
+                query.append(limit);
+            }
+            if (offset != null && offset > 0) {
+                query.append(" OFFSET ");
+                query.append(offset);
+            }
+        }
         return query.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.GenerateTableFetch/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.GenerateTableFetch/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.GenerateTableFetch/additionalDetails.html
new file mode 100644
index 0000000..6829311
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.GenerateTableFetch/additionalDetails.html
@@ -0,0 +1,69 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>GenerateTableFetch</title>
+
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+    	<p>
+    		GenerateTableFetch uses its properties and the specified database connection to generate flow files
+			containing SQL statements that can be used to fetch "pages" (aka "partitions") of data from a table.
+            GenerateTableFetch executes a query to the database to determine the current row count and maximum value,
+            and if Maximum Value Columns are specified, will collect the count of rows whose values for the Maximum
+            Value Columns are larger than those last observed by GenerateTableFetch. This allows for incremental
+            fetching of "new" rows, rather than generating SQL to fetch the entire table each time. If no Maximum
+            Value Columns are set, then the processor will generate SQL to fetch the entire table each time.
+        </p>
+    	
+    	<p>
+    		In order to generate SQL that will fetch pages/partitions of data, by default GenerateTableFetch will
+            generate SQL that orders the data based on the Maximum Value Columns (if present) and utilize the row
+            numbers of the result set to determine each page. For example if the Maximum Value Column is an integer "id"
+            and the partition size is 10, then the SQL for the first page might be "SELECT * FROM myTable LIMIT 10" and
+            the second page might be "SELECT * FROM myTable OFFSET 10 LIMIT 10", and so on.
+    	</p>
+    	
+    	<p>
+			Ordering the data can be an expensive operation depending on the database, the number of rows, etc.
+            Alternatively, it is possible to specify a column whose values will be used to determine the pages, using
+            the Column for Value Partitioning property. If set, GenerateTableFetch will determine the minimum and
+            maximum values for the column, and uses the minimum value as the initial offset. The SQL to fetch a page is
+            then based on this initial offset and the total difference in values (i.e. maximum - minimum) divided by
+            the page size. For example, if the column "id" is used for value partitioning, and the column contains
+            values 100 to 200, then with a page size of 10 the SQL to fetch the first page might be "SELECT * FROM
+            myTable WHERE id >= 100 AND id < 110" and the second page might be "SELECT * FROM myTable WHERE id >= 110
+            AND id < 120", and so on.
+        </p>
+
+        <p>
+            It is important that the Column for Value Partitioning be set to a column whose type can be coerced to a
+            long integer (i.e. not date or timestamp), and that the column values are evenly distributed and not
+            sparse, for best performance. As a counterexample to the above, consider a column "id" whose values are 100,
+            2000, and 30000. If the Partition Size is 100, then the column values are relatively sparse, so the SQL
+            for the "second page" (see above example) will return zero rows, and so will every page until the value in
+            the query becomes "id >= 2000". Another counterexample is when the values are not uniformly distributed.
+            Consider a column "id" with values 100, 200, 201, 202, ... 299. Then the SQL for the first page (see above
+            example) will return one row with value id = 100, and the second page will return 100 rows with values 200
+            ... 299. This can cause inconsistent processing times downstream, as the pages may contain a very different
+            number of rows. For these reasons it is recommended to use a Column for Value Partitioning that is
+            sufficiently dense (not sparse) and fairly evenly distributed.
+    	</p>
+	</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 253f4d0..f6c27fa 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -855,9 +855,9 @@ public class TestGenerateTableFetch {
 
     @Test
     public void testRidiculousRowCount() throws ClassNotFoundException, SQLException, InitializationException, IOException {
-        long rowCount= Long.parseLong(Integer.toString(Integer.MAX_VALUE)) + 100;
+        long rowCount = Long.parseLong(Integer.toString(Integer.MAX_VALUE)) + 100;
         int partitionSize = 1000000;
-        int expectedFileCount = (int)(rowCount/partitionSize) + 1;
+        int expectedFileCount = (int) (rowCount / partitionSize) + 1;
 
         Connection conn = mock(Connection.class);
         when(dbcp.getConnection()).thenReturn(conn);
@@ -867,7 +867,7 @@ public class TestGenerateTableFetch {
         ResultSet rs = mock(ResultSet.class);
         when(st.executeQuery(anyString())).thenReturn(rs);
         when(rs.next()).thenReturn(true);
-        when(rs.getInt(1)).thenReturn((int)rowCount);
+        when(rs.getInt(1)).thenReturn((int) rowCount);
         when(rs.getLong(1)).thenReturn(rowCount);
 
         final ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
@@ -890,7 +890,7 @@ public class TestGenerateTableFetch {
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, expectedFileCount);
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
         String query = new String(flowFile.toByteArray());
-        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY ID FETCH NEXT 1000000 ROWS ONLY", query);
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE 1=1 ORDER BY ID FETCH NEXT 1000000 ROWS ONLY", query);
         runner.clearTransferState();
     }
 
@@ -997,7 +997,7 @@ public class TestGenerateTableFetch {
 
         ResultSet resultSet = stmt.executeQuery(query);
         int numberRecordsFirstExecution = 0; // Should be three records
-        while(resultSet.next()) {
+        while (resultSet.next()) {
             numberRecordsFirstExecution++;
         }
         runner.clearTransferState();
@@ -1010,7 +1010,7 @@ public class TestGenerateTableFetch {
 
         resultSet = stmt.executeQuery(query);
         int numberRecordsSecondExecution = 0; // Should be three records
-        while(resultSet.next()) {
+        while (resultSet.next()) {
             numberRecordsSecondExecution++;
         }
 
@@ -1227,12 +1227,12 @@ public class TestGenerateTableFetch {
         runner.run(2);
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
 
-        assertEquals(2,processor.columnTypeMap.size());
+        assertEquals(2, processor.columnTypeMap.size());
         runner.clearTransferState();
 
 
         // Remove one element from columnTypeMap to simulate it's re-cache partial state
-        Map.Entry<String,Integer> entry = processor.columnTypeMap.entrySet().iterator().next();
+        Map.Entry<String, Integer> entry = processor.columnTypeMap.entrySet().iterator().next();
         String key = entry.getKey();
         processor.columnTypeMap.remove(key);
 
@@ -1248,7 +1248,143 @@ public class TestGenerateTableFetch {
         // It should re-cache column type
         runner.run();
         runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
-        assertEquals(2,processor.columnTypeMap.size());
+        assertEquals(2, processor.columnTypeMap.size());
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testUseColumnValuesForPartitioning() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (10, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (11, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (12, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(GenerateTableFetch.COLUMN_FOR_VALUE_PARTITIONING, "ID");
+        runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+        // First flow file
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        String query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 12 AND ID >= 10 AND ID < 12", query);
+        ResultSet resultSet = stmt.executeQuery(query);
+        // Should be two records
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        // Second flow file
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 12 AND ID >= 12 AND ID < 14", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be one record
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Add 3 new rows with a higher ID and run with a partition size of 2. Three flow files should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (20, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (21, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (24, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 3);
+
+        // Verify first flow file's contents
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 12 AND ID <= 24 AND ID >= 20 AND ID < 22", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be two records
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+
+        // Verify second flow file's contents
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 12 AND ID <= 24 AND ID >= 22 AND ID < 24", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be no records
+        assertFalse(resultSet.next());
+
+        // Verify third flow file's contents
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(2);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 12 AND ID <= 24 AND ID >= 24 AND ID < 26", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be one record
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testUseColumnValuesForPartitioningNoMaxValueColumn() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (10, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (11, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (12, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(GenerateTableFetch.COLUMN_FOR_VALUE_PARTITIONING, "ID");
+        runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+        // First flow file
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        String query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE 1=1 AND ID >= 10 AND ID < 12", query);
+        ResultSet resultSet = stmt.executeQuery(query);
+        // Should be two records
+        assertTrue(resultSet.next());
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        // Second flow file
+        flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+        query = new String(flowFile.toByteArray());
+        assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 12 AND ID >= 12", query);
+        resultSet = stmt.executeQuery(query);
+        // Should be one record
+        assertTrue(resultSet.next());
+        assertFalse(resultSet.next());
+        runner.clearTransferState();
+
+        // Run again, the same flowfiles should be transferred as we have no maximum-value column
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
         runner.clearTransferState();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
index 636f886..579e39e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
@@ -36,6 +36,11 @@ public class DerbyDatabaseAdapter implements DatabaseAdapter {
 
     @Override
     public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+        return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+    }
+
+    @Override
+    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name cannot be null or empty");
         }
@@ -51,21 +56,37 @@ public class DerbyDatabaseAdapter implements DatabaseAdapter {
         if (!StringUtils.isEmpty(whereClause)) {
             query.append(" WHERE ");
             query.append(whereClause);
+            if (!StringUtils.isEmpty(columnForPartitioning)) {
+                query.append(" AND ");
+                query.append(columnForPartitioning);
+                query.append(" >= ");
+                query.append(offset != null ? offset : "0");
+                if (limit != null) {
+                    query.append(" AND ");
+                    query.append(columnForPartitioning);
+                    query.append(" < ");
+                    query.append((offset == null ? 0 : offset) + limit);
+                }
+            }
         }
-        if (!StringUtils.isEmpty(orderByClause)) {
+
+        if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
             query.append(" ORDER BY ");
             query.append(orderByClause);
+
         }
-        if (offset != null && offset > 0) {
-            query.append(" OFFSET ");
-            query.append(offset);
-            query.append(" ROWS");
-        }
+        if (StringUtils.isEmpty(columnForPartitioning)) {
+            if (offset != null && offset > 0) {
+                query.append(" OFFSET ");
+                query.append(offset);
+                query.append(" ROWS");
+            }
 
-        if (limit != null) {
-            query.append(" FETCH NEXT ");
-            query.append(limit);
-            query.append(" ROWS ONLY");
+            if (limit != null) {
+                query.append(" FETCH NEXT ");
+                query.append(limit);
+                query.append(" ROWS ONLY");
+            }
         }
 
         return query.toString();

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java
index 1390072..912a720 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java
@@ -79,4 +79,28 @@ public class TestMSSQL2008DatabaseAdapter {
                 + "WHERE methods='strange' ORDER BY contain) A WHERE rnum > 123456 AND rnum <= 133456";
         Assert.assertEquals(expected3, sql);
     }
+
+    @Test
+    public void testPagingQueryUsingColumnValuesForPartitioning() {
+        String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
+                100L, 0L, "contain");
+        String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 0 AND contain < 100";
+        Assert.assertEquals(expected1, sql1);
+
+        String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
+                10000L, 123456L, "contain");
+        String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 123456 AND contain < 133456";
+        Assert.assertEquals(expected2, sql2);
+
+        String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'",
+                "contain", 10000L, 123456L, "contain");
+        String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' AND contain >= 123456 AND contain < 133456";
+        Assert.assertEquals(expected3, sql3);
+
+        // Paging (limit/offset) is only supported when an orderByClause is supplied, note that it is not honored here
+        String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "",
+                100L, null, "contain");
+        String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
+        Assert.assertEquals(expected4, sql4);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java
index 9c0a0ff..de33753 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java
@@ -81,4 +81,28 @@ public class TestMSSQLDatabaseAdapter {
         String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' ORDER BY contain OFFSET 123456 ROWS FETCH NEXT 10000 ROWS ONLY";
         Assert.assertEquals(sql3,expected3);
     }
+
+    @Test
+    public void testPagingQueryUsingColumnValuesForPartitioning() {
+        String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
+                100L, 0L, "contain");
+        String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 0 AND contain < 100";
+        Assert.assertEquals(expected1, sql1);
+
+        String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
+                10000L, 123456L, "contain");
+        String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 123456 AND contain < 133456";
+        Assert.assertEquals(expected2, sql2);
+
+        String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'",
+                "contain", 10000L, 123456L, "contain");
+        String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' AND contain >= 123456 AND contain < 133456";
+        Assert.assertEquals(expected3, sql3);
+
+        // Paging (limit/offset) is only supported when an orderByClause is supplied, note that it is not honored here
+        String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "",
+                100L, null, "contain");
+        String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
+        Assert.assertEquals(expected4, sql4);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
index 2a80ab2..2315e98 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
@@ -26,45 +26,64 @@ public class TestOracle12DatabaseAdapter {
 
     @Test
     public void testGeneration() throws Exception {
-        String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","",null,null);
+        String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "", null, null);
         String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
-        Assert.assertEquals(sql1,expected1);
+        Assert.assertEquals(sql1, expected1);
 
-        String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","that=\'some\"\' value\'","",null,null);
+        String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "that=\'some\"\' value\'", "", null, null);
         String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\'";
-        Assert.assertEquals(sql2,expected2);
+        Assert.assertEquals(sql2, expected2);
 
-        String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","that=\'some\"\' value\'","might DESC",null,null);
+        String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "that=\'some\"\' value\'", "might DESC", null, null);
         String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
-        Assert.assertEquals(sql3,expected3);
+        Assert.assertEquals(sql3, expected3);
 
-        String sql4 = db.getSelectStatement("database.tablename", "","that=\'some\"\' value\'","might DESC",null,null);
+        String sql4 = db.getSelectStatement("database.tablename", "", "that=\'some\"\' value\'", "might DESC", null, null);
         String expected4 = "SELECT * FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
-        Assert.assertEquals(sql4,expected4);
+        Assert.assertEquals(sql4, expected4);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testNoTableName() throws Exception {
-        db.getSelectStatement("", "some(set),of(columns),that,might,contain,methods,a.*","","",null,null);
+        db.getSelectStatement("", "some(set),of(columns),that,might,contain,methods,a.*", "", "", null, null);
     }
 
     @Test
     public void testPagingQuery() throws Exception {
-        String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","contain",100L,0L);
+        String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "contain", 100L, 0L);
         String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename ORDER BY contain FETCH NEXT 100 ROWS ONLY";
-        Assert.assertEquals(sql1,expected1);
+        Assert.assertEquals(sql1, expected1);
 
-        String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","contain",10000L,123456L);
+        String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "contain", 10000L, 123456L);
         String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename ORDER BY contain OFFSET 123456 ROWS FETCH NEXT 10000 ROWS ONLY";
-        Assert.assertEquals(sql2,expected2);
+        Assert.assertEquals(sql2, expected2);
 
-        String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","methods='strange'","contain",10000L,123456L);
+        String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'", "contain", 10000L, 123456L);
         String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' ORDER BY contain OFFSET 123456 ROWS FETCH NEXT 10000 ROWS ONLY";
-        Assert.assertEquals(sql3,expected3);
+        Assert.assertEquals(sql3, expected3);
 
-        String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","",100L,null);
+        String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "", 100L, null);
         String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename FETCH NEXT 100 ROWS ONLY";
-        Assert.assertEquals(sql4,expected4);
+        Assert.assertEquals(sql4, expected4);
     }
 
+    @Test
+    public void testPagingQueryUsingColumnValuesForPartitioning() throws Exception {
+        String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain", 100L, 0L, "contain");
+        String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 0 AND contain < 100";
+        Assert.assertEquals(expected1, sql1);
+
+        String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain", 10000L, 123456L, "contain");
+        String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 123456 AND contain < 133456";
+        Assert.assertEquals(expected2, sql2);
+
+        String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'", "contain", 10000L, 123456L, "contain");
+        String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' AND contain >= 123456 AND contain < 133456";
+        Assert.assertEquals(expected3, sql3);
+
+        // Paging (limit/offset) is only supported when an orderByClause is supplied, note that it is not honored here
+        String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "", 100L, null, "contain");
+        String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
+        Assert.assertEquals(expected4, sql4);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
new file mode 100644
index 0000000..d33b03c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestOracleDatabaseAdapter {
+
+    private final DatabaseAdapter db = new OracleDatabaseAdapter();
+
+    @Test
+    public void testGeneration() {
+        String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "", null, null);
+        String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
+        Assert.assertEquals(expected1, sql1);
+
+        String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "that=\'some\"\' value\'", "", null, null);
+        String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\'";
+        Assert.assertEquals(expected2, sql2);
+
+        String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "that=\'some\"\' value\'", "might DESC", null, null);
+        String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
+        Assert.assertEquals(expected3, sql3);
+
+        String sql4 = db.getSelectStatement("database.tablename", "", "that=\'some\"\' value\'", "might DESC", null, null);
+        String expected4 = "SELECT * FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
+        Assert.assertEquals(expected4, sql4);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNoTableName() throws IllegalArgumentException {
+        db.getSelectStatement("", "some(set),of(columns),that,might,contain,methods,a.*", "", "", null, null);
+    }
+
+    @Test
+    public void testPagingQuery() {
+        String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "contain",
+                100L, 0L);
+        String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM (SELECT a.*, ROWNUM rnum FROM (SELECT some(set),of(columns),that,might,contain,methods,a.* "
+                + "FROM database.tablename ORDER BY contain) a WHERE ROWNUM <= 100) WHERE rnum > 0";
+        Assert.assertEquals(expected1, sql1);
+
+        String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "contain",
+                10000L, 123456L);
+        String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM (SELECT a.*, ROWNUM rnum FROM (SELECT some(set),of(columns),that,might,contain,methods,a.* "
+                + "FROM database.tablename ORDER BY contain) a WHERE ROWNUM <= 133456) WHERE rnum > 123456";
+        Assert.assertEquals(expected2, sql2);
+
+        String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'",
+                "contain", 10000L, 123456L);
+        String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM (SELECT a.*, ROWNUM rnum FROM (SELECT some(set),of(columns),that,might,contain,methods,a.* "
+                + "FROM database.tablename WHERE methods='strange' ORDER BY contain) a WHERE ROWNUM <= 133456) WHERE rnum > 123456";
+        Assert.assertEquals(expected3, sql3);
+
+        String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "",
+                100L, null);
+        String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM (SELECT a.*, ROWNUM rnum FROM (SELECT some(set),of(columns),that,might,contain,methods,a.* "
+                + "FROM database.tablename) a WHERE ROWNUM <= 100) WHERE rnum > 0";
+        Assert.assertEquals(expected4, sql4);
+    }
+
+    @Test
+    public void testPagingQueryUsingColumnValuesForPartitioning() {
+        String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
+                100L, 0L, "contain");
+        String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 0 AND contain < 100";
+        Assert.assertEquals(expected1, sql1);
+
+        String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
+                10000L, 123456L, "contain");
+        String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 123456 AND contain < 133456";
+        Assert.assertEquals(expected2, sql2);
+
+        String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'",
+                "contain", 10000L, 123456L, "contain");
+        String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' AND contain >= 123456 AND contain < 133456";
+        Assert.assertEquals(expected3, sql3);
+
+        // Paging (limit/offset) is only supported when an orderByClause is supplied, note that it is not honored here
+        String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "",
+                100L, null, "contain");
+        String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
+        Assert.assertEquals(expected4, sql4);
+    }
+}
\ No newline at end of file