You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/06/08 07:36:37 UTC
nifi git commit: NIFI-5143: Initial work to support column values for
paging results
Repository: nifi
Updated Branches:
refs/heads/master 729f8aa24 -> 0e09b98b0
NIFI-5143: Initial work to support column values for paging results
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #2728.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0e09b98b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0e09b98b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0e09b98b
Branch: refs/heads/master
Commit: 0e09b98b02bcfae21841cb7dda176c7c1c33f4d8
Parents: 729f8aa
Author: Matthew Burgess <ma...@apache.org>
Authored: Mon May 21 13:45:16 2018 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Fri Jun 8 09:36:27 2018 +0200
----------------------------------------------------------------------
.../processors/standard/GenerateTableFetch.java | 100 +++++++++---
.../processors/standard/db/DatabaseAdapter.java | 17 ++
.../db/impl/GenericDatabaseAdapter.java | 36 +++--
.../db/impl/MSSQL2008DatabaseAdapter.java | 27 +++-
.../standard/db/impl/MSSQLDatabaseAdapter.java | 43 ++++--
.../db/impl/Oracle12DatabaseAdapter.java | 39 +++--
.../standard/db/impl/OracleDatabaseAdapter.java | 22 ++-
.../db/impl/PhoenixDatabaseAdapter.java | 40 +++--
.../additionalDetails.html | 69 +++++++++
.../standard/TestGenerateTableFetch.java | 154 +++++++++++++++++--
.../standard/db/impl/DerbyDatabaseAdapter.java | 41 +++--
.../db/impl/TestMSSQL2008DatabaseAdapter.java | 24 +++
.../db/impl/TestMSSQLDatabaseAdapter.java | 24 +++
.../db/impl/TestOracle12DatabaseAdapter.java | 53 +++++--
.../db/impl/TestOracleDatabaseAdapter.java | 101 ++++++++++++
15 files changed, 685 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index a2d01e9..9842d6c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -30,6 +30,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
@@ -98,8 +99,8 @@ import java.util.stream.IntStream;
@WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
})
@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language",
- expressionLanguageScope = ExpressionLanguageScope.NONE, description = "Specifies an initial "
- + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
+ expressionLanguageScope = ExpressionLanguageScope.NONE, description = "Specifies an initial "
+ + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
@@ -115,6 +116,19 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
+ static final PropertyDescriptor COLUMN_FOR_VALUE_PARTITIONING = new PropertyDescriptor.Builder()
+ .name("gen-table-column-for-val-partitioning")
+ .displayName("Column for Value Partitioning")
+ .description("The name of a column whose values will be used for partitioning. The default behavior is to use row numbers on the result set for partitioning into "
+ + "'pages' to be fetched from the database, using an offset/limit strategy. However for certain databases, it can be more efficient under the right circumstances to use "
+ + "the column values themselves to define the 'pages'. This property should only be used when the default queries are not performing well, when there is no maximum-value "
+ + "column or a single maximum-value column whose type can be coerced to a long integer (i.e. not date or timestamp), and the column values are evenly distributed and not "
+ + "sparse, for best performance.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. "
@@ -135,6 +149,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
pds.add(MAX_VALUE_COLUMN_NAMES);
pds.add(QUERY_TIMEOUT);
pds.add(PARTITION_SIZE);
+ pds.add(COLUMN_FOR_VALUE_PARTITIONING);
pds.add(WHERE_CLAUSE);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -151,7 +166,15 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
- return super.customValidate(validationContext);
+ List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+ final PropertyValue columnForPartitioning = validationContext.getProperty(COLUMN_FOR_VALUE_PARTITIONING);
+ // If no EL is present, ensure it's a single column (i.e. no commas in the property value)
+ if (columnForPartitioning.isSet() && !columnForPartitioning.isExpressionLanguagePresent() && columnForPartitioning.getValue().contains(",")) {
+ results.add(new ValidationResult.Builder().valid(false).explanation(
+ COLUMN_FOR_VALUE_PARTITIONING.getDisplayName() + " requires a single column name, but a comma was detected").build());
+ }
+
+ return results;
}
@Override
@@ -195,6 +218,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
+ final String columnForPartitioning = context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue();
+ final boolean useColumnValsForPaging = !StringUtils.isEmpty(columnForPartitioning);
final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
final StateManager stateManager = context.getStateManager();
@@ -241,20 +266,24 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? new ArrayList<>(0)
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
- List<String> maxValueClauses = new ArrayList<>(maxValueColumnNameList.size());
+ final int numMaxValueColumns = maxValueColumnNameList.size();
+
+ List<String> maxValueClauses = new ArrayList<>(numMaxValueColumns);
+ Long maxValueForPartitioning = null;
+ Long minValueForPartitioning = null;
String columnsClause = null;
- List<String> maxValueSelectColumns = new ArrayList<>(maxValueColumnNameList.size() + 1);
+ List<String> maxValueSelectColumns = new ArrayList<>(numMaxValueColumns + 1);
maxValueSelectColumns.add("COUNT(*)");
// For each maximum-value column, get a WHERE filter and a MAX(column) alias
- IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
+ IntStream.range(0, numMaxValueColumns).forEach((index) -> {
String colName = maxValueColumnNameList.get(index);
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName, dbAdapter);
if (!StringUtils.isEmpty(maxValue)) {
- if(columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null){
+ if (columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null) {
// This means column type cache is clean after instance reboot. We should re-cache column type
super.setup(context, false, finalFileToProcess);
}
@@ -263,8 +292,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
// Add a condition for the WHERE clause
maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
}
+
});
+ // If we are using a columns' values, get the maximum and minimum values in the context of the aforementioned WHERE clause
+ if (useColumnValsForPaging) {
+ if(columnForPartitioning.contains(",")) {
+ throw new ProcessException(COLUMN_FOR_VALUE_PARTITIONING.getDisplayName() + " requires a single column name, but a comma was detected");
+ }
+ maxValueSelectColumns.add("MAX(" + columnForPartitioning + ") " + columnForPartitioning);
+ maxValueSelectColumns.add("MIN(" + columnForPartitioning + ") MIN_" + columnForPartitioning);
+ }
+
if (customWhereClause != null) {
// adding the custom WHERE clause (if defined) to the list of existing clauses.
maxValueClauses.add("(" + customWhereClause + ")");
@@ -294,7 +333,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
// Update the state map with the newly-observed maximum values
ResultSetMetaData rsmd = resultSet.getMetaData();
- for (int i = 2; i <= rsmd.getColumnCount(); i++) {
+ int i = 2;
+ for (; i <= numMaxValueColumns + 1; i++) {
//Some JDBC drivers consider the columns name and label to be very different things.
// Since this column has been aliased lets check the label first,
// if there is no label we'll use the column name.
@@ -318,11 +358,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
if (newMaxValue != null) {
statePropertyMap.put(fullyQualifiedStateKey, newMaxValue);
}
- } catch (ParseException | IOException pie) {
+ } catch (ParseException | IOException | ClassCastException pice) {
// Fail the whole thing here before we start creating flow files and such
- throw new ProcessException(pie);
+ throw new ProcessException(pice);
}
-
+ }
+ // Process the maximum and minimum values for the partitioning column if necessary
+ // These are currently required to be Long values, will throw a ClassCastException if they are not
+ if (useColumnValsForPaging) {
+ Object o = resultSet.getObject(i);
+ maxValueForPartitioning = o == null ? null : Long.valueOf(o.toString());
+ o = resultSet.getObject(i + 1);
+ minValueForPartitioning = o == null ? null : Long.valueOf(o.toString());
}
} else {
// Something is very wrong here, one row (even if count is zero) should be returned
@@ -330,13 +377,13 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
}
// for each maximum-value column get a right bounding WHERE condition
- IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
+ IntStream.range(0, numMaxValueColumns).forEach((index) -> {
String colName = maxValueColumnNameList.get(index);
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName, dbAdapter);
if (!StringUtils.isEmpty(maxValue)) {
- if(columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null){
+ if (columnTypeMap.isEmpty() || getColumnType(tableName, colName, dbAdapter) == null) {
// This means column type cache is clean after instance reboot. We should re-cache column type
super.setup(context, false, finalFileToProcess);
}
@@ -347,17 +394,30 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
}
});
- //Update WHERE list to include new right hand boundaries
- whereClause = StringUtils.join(maxValueClauses, " AND ");
-
- final long numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
+ final long numberOfFetches;
+ if (useColumnValsForPaging) {
+ final long valueRangeSize = maxValueForPartitioning == null ? 0 : (maxValueForPartitioning - minValueForPartitioning + 1);
+ numberOfFetches = (partitionSize == 0) ? 1 : (valueRangeSize / partitionSize) + (valueRangeSize % partitionSize == 0 ? 0 : 1);
+ } else {
+ numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
+ }
// Generate SQL statements to read "pages" of data
+ Long limit = partitionSize == 0 ? null : (long) partitionSize;
for (long i = 0; i < numberOfFetches; i++) {
- Long limit = partitionSize == 0 ? null : (long) partitionSize;
- Long offset = partitionSize == 0 ? null : i * partitionSize;
+ // Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit)
+ if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) {
+ maxValueClauses.add(columnForPartitioning + " <= " + maxValueForPartitioning);
+ limit = null;
+ }
+
+ //Update WHERE list to include new right hand boundaries
+ whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND ");
+
+ Long offset = partitionSize == 0 ? null : i * partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0);
+
final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", ");
- final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset);
+ final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning);
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName);
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
index 4e1ad47..211d4c8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
@@ -39,6 +39,23 @@ public interface DatabaseAdapter {
String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset);
/**
+ * Returns a SQL SELECT statement with the given clauses applied. Note that if this method is overridden, the other overloaded methods
+ * need to be overridden as well, to call this method with columnForPartitioning = false
+ *
+ * @param tableName The name of the table to fetch rows from
+ * @param columnNames The names of the columns to fetch from the table
+ * @param whereClause The filter to apply to the statement. This should not include the WHERE keyword
+ * @param orderByClause The columns/clause used for ordering the result rows. This should not include the ORDER BY keywords
+ * @param limit The value for the LIMIT clause (i.e. the number of rows to return)
+ * @param offset The value for the OFFSET clause (i.e. the number of rows to skip)
+ * @param columnForPartitioning The (optional) column name that, if provided, the limit and offset values are based on values from the column itself (rather than the row number)
+ * @return A String containing a SQL SELECT statement with the given clauses applied
+ */
+ default String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
+ return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset);
+ }
+
+ /**
* <p>Returns a bare identifier string by removing wrapping escape characters
* from identifier strings such as table and column names.</p>
* <p>The default implementation of this method removes double quotes.
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
index c663361..9efd9e0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java
@@ -35,6 +35,11 @@ public class GenericDatabaseAdapter implements DatabaseAdapter {
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+ return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+ }
+
+ @Override
+ public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
@@ -50,20 +55,33 @@ public class GenericDatabaseAdapter implements DatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
+ if (!StringUtils.isEmpty(columnForPartitioning)) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" >= ");
+ query.append(offset != null ? offset : "0");
+ if (limit != null) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" < ");
+ query.append((offset == null ? 0 : offset) + limit);
+ }
+ }
}
- if (!StringUtils.isEmpty(orderByClause)) {
+ if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
- if (limit != null) {
- query.append(" LIMIT ");
- query.append(limit);
+ if (StringUtils.isEmpty(columnForPartitioning)) {
+ if (limit != null) {
+ query.append(" LIMIT ");
+ query.append(limit);
+ }
+ if (offset != null && offset > 0) {
+ query.append(" OFFSET ");
+ query.append(offset);
+ }
}
- if (offset != null && offset > 0) {
- query.append(" OFFSET ");
- query.append(offset);
- }
-
return query.toString();
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java
index 3759cb4..610b34d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQL2008DatabaseAdapter.java
@@ -34,14 +34,19 @@ public class MSSQL2008DatabaseAdapter extends MSSQLDatabaseAdapter {
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+ return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+ }
+
+ @Override
+ public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
final StringBuilder query = new StringBuilder("SELECT ");
-
+ boolean useColumnForPartitioning = !StringUtils.isEmpty(columnForPartitioning);
// If this is a limit query and not a paging query then use TOP in MS SQL
- if (limit != null) {
+ if (limit != null && !useColumnForPartitioning) {
if (offset != null) {
query.append("* FROM (SELECT ");
@@ -60,7 +65,7 @@ public class MSSQL2008DatabaseAdapter extends MSSQLDatabaseAdapter {
query.append(columnNames);
}
- if (limit != null && offset != null && orderByClause != null) {
+ if (limit != null && offset != null && orderByClause != null && !useColumnForPartitioning) {
query.append(", ROW_NUMBER() OVER(ORDER BY ");
query.append(orderByClause);
query.append(" asc) rnum");
@@ -71,14 +76,26 @@ public class MSSQL2008DatabaseAdapter extends MSSQLDatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
+ if (useColumnForPartitioning) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" >= ");
+ query.append(offset != null ? offset : "0");
+ if (limit != null) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" < ");
+ query.append((offset == null ? 0 : offset) + limit);
+ }
+ }
}
- if (!StringUtils.isEmpty(orderByClause)) {
+ if (!StringUtils.isEmpty(orderByClause) && !useColumnForPartitioning) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
- if (limit != null && offset != null) {
+ if (limit != null && offset != null && !useColumnForPartitioning) {
query.append(") A WHERE rnum > ");
query.append(offset);
query.append(" AND rnum <= ");
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
index 3d23d9f..ec276fc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
@@ -35,13 +35,18 @@ public class MSSQLDatabaseAdapter implements DatabaseAdapter {
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+ return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+ }
+
+ @Override
+ public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
final StringBuilder query = new StringBuilder("SELECT ");
//If this is a limit query and not a paging query then use TOP in MS SQL
- if (limit != null && offset == null){
+ if (limit != null && offset == null && StringUtils.isEmpty(columnForPartitioning)){
query.append("TOP ");
query.append(limit);
query.append(" ");
@@ -58,23 +63,37 @@ public class MSSQLDatabaseAdapter implements DatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
+ if (!StringUtils.isEmpty(columnForPartitioning)) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" >= ");
+ query.append(offset != null ? offset : "0");
+ if (limit != null) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" < ");
+ query.append((offset == null ? 0 : offset) + limit);
+ }
+ }
}
- if (!StringUtils.isEmpty(orderByClause)) {
+ if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
- if (offset != null && limit != null && limit > 0) {
- if (StringUtils.isEmpty(orderByClause)) {
- throw new IllegalArgumentException("Order by clause cannot be null or empty when using row paging");
- }
+ if (StringUtils.isEmpty(columnForPartitioning)) {
+ if (offset != null && limit != null && limit > 0) {
+ if (StringUtils.isEmpty(orderByClause)) {
+ throw new IllegalArgumentException("Order by clause cannot be null or empty when using row paging");
+ }
- query.append(" OFFSET ");
- query.append(offset);
- query.append(" ROWS");
+ query.append(" OFFSET ");
+ query.append(offset);
+ query.append(" ROWS");
- query.append(" FETCH NEXT ");
- query.append(limit);
- query.append(" ROWS ONLY");
+ query.append(" FETCH NEXT ");
+ query.append(limit);
+ query.append(" ROWS ONLY");
+ }
}
return query.toString();
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
index 77d92e1..c128ed1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
@@ -35,6 +35,11 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter {
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+ return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+ }
+
+ @Override
+ public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
@@ -51,20 +56,34 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
+ if (!StringUtils.isEmpty(columnForPartitioning)) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" >= ");
+ query.append(offset != null ? offset : "0");
+ if (limit != null) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" < ");
+ query.append((offset == null ? 0 : offset) + limit);
+ }
+ }
}
- if (!StringUtils.isEmpty(orderByClause)) {
+ if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
- if (offset != null && offset > 0) {
- query.append(" OFFSET ");
- query.append(offset);
- query.append(" ROWS");
- }
- if (limit != null) {
- query.append(" FETCH NEXT ");
- query.append(limit);
- query.append(" ROWS ONLY");
+ if (StringUtils.isEmpty(columnForPartitioning)) {
+ if (offset != null && offset > 0) {
+ query.append(" OFFSET ");
+ query.append(offset);
+ query.append(" ROWS");
+ }
+ if (limit != null) {
+ query.append(" FETCH NEXT ");
+ query.append(limit);
+ query.append(" ROWS ONLY");
+ }
}
return query.toString();
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
index 7ea57cc..5a1f099 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java
@@ -35,12 +35,17 @@ public class OracleDatabaseAdapter implements DatabaseAdapter {
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+ return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+ }
+
+ @Override
+ public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
final StringBuilder query = new StringBuilder();
- boolean nestedSelect = (limit != null || offset != null);
+ boolean nestedSelect = (limit != null || offset != null) && StringUtils.isEmpty(columnForPartitioning);
if (nestedSelect) {
// Need a nested SELECT query here in order to use ROWNUM to limit the results
query.append("SELECT ");
@@ -64,8 +69,20 @@ public class OracleDatabaseAdapter implements DatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
+ if (!StringUtils.isEmpty(columnForPartitioning)) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" >= ");
+ query.append(offset != null ? offset : "0");
+ if (limit != null) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" < ");
+ query.append((offset == null ? 0 : offset) + limit);
+ }
+ }
}
- if (!StringUtils.isEmpty(orderByClause)) {
+ if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
@@ -82,6 +99,7 @@ public class OracleDatabaseAdapter implements DatabaseAdapter {
query.append(") WHERE rnum > ");
query.append(offsetVal);
}
+
return query.toString();
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
index 92522db..a70d88a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PhoenixDatabaseAdapter.java
@@ -36,8 +36,12 @@ public final class PhoenixDatabaseAdapter implements DatabaseAdapter {
}
@Override
- public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause,
- Long limit, Long offset) {
+ public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+ return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+ }
+
+ @Override
+ public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
@@ -53,20 +57,34 @@ public final class PhoenixDatabaseAdapter implements DatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
+ if (!StringUtils.isEmpty(columnForPartitioning)) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" >= ");
+ query.append(offset != null ? offset : "0");
+ if (limit != null) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" < ");
+ query.append((offset == null ? 0 : offset) + limit);
+ }
+ }
}
- if (!StringUtils.isEmpty(orderByClause)) {
+ if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
query.append(" ORDER BY ");
query.append(orderByClause);
}
- if (limit != null) {
- query.append(" LIMIT ");
- query.append(limit);
- }
- if (offset != null && offset > 0) {
- query.append(" OFFSET ");
- query.append(offset);
- }
+ if (StringUtils.isEmpty(columnForPartitioning)) {
+ if (limit != null) {
+ query.append(" LIMIT ");
+ query.append(limit);
+ }
+ if (offset != null && offset > 0) {
+ query.append(" OFFSET ");
+ query.append(offset);
+ }
+ }
return query.toString();
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.GenerateTableFetch/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.GenerateTableFetch/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.GenerateTableFetch/additionalDetails.html
new file mode 100644
index 0000000..6829311
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.GenerateTableFetch/additionalDetails.html
@@ -0,0 +1,69 @@
+<!DOCTYPE html>
+<html lang="en">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <head>
+ <meta charset="utf-8" />
+ <title>GenerateTableFetch</title>
+
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+ </head>
+
+ <body>
+ <p>
+ GenerateTableFetch uses its properties and the specified database connection to generate flow files
+ containing SQL statements that can be used to fetch "pages" (aka "partitions") of data from a table.
+ GenerateTableFetch executes a query to the database to determine the current row count and maximum value,
+ and if Maximum Value Columns are specified, will collect the count of rows whose values for the Maximum
+ Value Columns are larger than those last observed by GenerateTableFetch. This allows for incremental
+ fetching of "new" rows, rather than generating SQL to fetch the entire table each time. If no Maximum
+ Value Columns are set, then the processor will generate SQL to fetch the entire table each time.
+ </p>
+
+ <p>
+ In order to generate SQL that will fetch pages/partitions of data, by default GenerateTableFetch will
+ generate SQL that orders the data based on the Maximum Value Columns (if present) and utilize the row
+ numbers of the result set to determine each page. For example if the Maximum Value Column is an integer "id"
+ and the partition size is 10, then the SQL for the first page might be "SELECT * FROM myTable LIMIT 10" and
+ the second page might be "SELECT * FROM myTable OFFSET 10 LIMIT 10", and so on.
+ </p>
+
+ <p>
+ Ordering the data can be an expensive operation depending on the database, the number of rows, etc.
+ Alternatively, it is possible to specify a column whose values will be used to determine the pages, using
+ the Column for Value Partitioning property. If set, GenerateTableFetch will determine the minimum and
+ maximum values for the column, and uses the minimum value as the initial offset. The SQL to fetch a page is
+ then based on this initial offset and the total difference in values (i.e. maximum - minimum) divided by
+ the page size. For example, if the column "id" is used for value partitioning, and the column contains
+ values 100 to 200, then with a page size of 10 the SQL to fetch the first page might be "SELECT * FROM
+ myTable WHERE id >= 100 AND id < 110" and the second page might be "SELECT * FROM myTable WHERE id >= 110
+ AND id < 120", and so on.
+ </p>
+
+ <p>
+ It is important that the Column for Value Partitioning be set to a column whose type can be coerced to a
+ long integer (i.e. not date or timestamp), and that the column values are evenly distributed and not
+ sparse, for best performance. As a counterexample to the above, consider a column "id" whose values are 100,
+ 2000, and 30000. If the Partition Size is 100, then the column values are relatively sparse, so the SQL
+ for the "second page" (see above example) will return zero rows, and so will every page until the value in
+ the query becomes "id >= 2000". Another counterexample is when the values are not uniformly distributed.
+ Consider a column "id" with values 100, 200, 201, 202, ... 299. Then the SQL for the first page (see above
+ example) will return one row with value id = 100, and the second page will return 100 rows with values 200
+ ... 299. This can cause inconsistent processing times downstream, as the pages may contain a very different
+ number of rows. For these reasons it is recommended to use a Column for Value Partitioning that is
+ sufficiently dense (not sparse) and fairly evenly distributed.
+ </p>
+ </body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 253f4d0..f6c27fa 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -855,9 +855,9 @@ public class TestGenerateTableFetch {
@Test
public void testRidiculousRowCount() throws ClassNotFoundException, SQLException, InitializationException, IOException {
- long rowCount= Long.parseLong(Integer.toString(Integer.MAX_VALUE)) + 100;
+ long rowCount = Long.parseLong(Integer.toString(Integer.MAX_VALUE)) + 100;
int partitionSize = 1000000;
- int expectedFileCount = (int)(rowCount/partitionSize) + 1;
+ int expectedFileCount = (int) (rowCount / partitionSize) + 1;
Connection conn = mock(Connection.class);
when(dbcp.getConnection()).thenReturn(conn);
@@ -867,7 +867,7 @@ public class TestGenerateTableFetch {
ResultSet rs = mock(ResultSet.class);
when(st.executeQuery(anyString())).thenReturn(rs);
when(rs.next()).thenReturn(true);
- when(rs.getInt(1)).thenReturn((int)rowCount);
+ when(rs.getInt(1)).thenReturn((int) rowCount);
when(rs.getLong(1)).thenReturn(rowCount);
final ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
@@ -890,7 +890,7 @@ public class TestGenerateTableFetch {
runner.assertAllFlowFilesTransferred(REL_SUCCESS, expectedFileCount);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray());
- assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY ID FETCH NEXT 1000000 ROWS ONLY", query);
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE 1=1 ORDER BY ID FETCH NEXT 1000000 ROWS ONLY", query);
runner.clearTransferState();
}
@@ -997,7 +997,7 @@ public class TestGenerateTableFetch {
ResultSet resultSet = stmt.executeQuery(query);
int numberRecordsFirstExecution = 0; // Should be three records
- while(resultSet.next()) {
+ while (resultSet.next()) {
numberRecordsFirstExecution++;
}
runner.clearTransferState();
@@ -1010,7 +1010,7 @@ public class TestGenerateTableFetch {
resultSet = stmt.executeQuery(query);
int numberRecordsSecondExecution = 0; // Should be three records
- while(resultSet.next()) {
+ while (resultSet.next()) {
numberRecordsSecondExecution++;
}
@@ -1227,12 +1227,12 @@ public class TestGenerateTableFetch {
runner.run(2);
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
- assertEquals(2,processor.columnTypeMap.size());
+ assertEquals(2, processor.columnTypeMap.size());
runner.clearTransferState();
// Remove one element from columnTypeMap to simulate it's re-cache partial state
- Map.Entry<String,Integer> entry = processor.columnTypeMap.entrySet().iterator().next();
+ Map.Entry<String, Integer> entry = processor.columnTypeMap.entrySet().iterator().next();
String key = entry.getKey();
processor.columnTypeMap.remove(key);
@@ -1248,7 +1248,143 @@ public class TestGenerateTableFetch {
// It should re-cache column type
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
- assertEquals(2,processor.columnTypeMap.size());
+ assertEquals(2, processor.columnTypeMap.size());
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testUseColumnValuesForPartitioning() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (10, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (11, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (12, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+ runner.setProperty(GenerateTableFetch.COLUMN_FOR_VALUE_PARTITIONING, "ID");
+ runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+ // First flow file
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ String query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 12 AND ID >= 10 AND ID < 12", query);
+ ResultSet resultSet = stmt.executeQuery(query);
+ // Should be two records
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ // Second flow file
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 12 AND ID >= 12 AND ID < 14", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be one record
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Run again, this time no flowfiles/rows should be transferred
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ // Add 3 new rows with a higher ID and run with a partition size of 2. Three flow files should be transferred
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (20, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (21, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (24, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 3);
+
+ // Verify first flow file's contents
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 12 AND ID <= 24 AND ID >= 20 AND ID < 22", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be two records
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+
+ // Verify second flow file's contents
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 12 AND ID <= 24 AND ID >= 22 AND ID < 24", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be no records
+ assertFalse(resultSet.next());
+
+ // Verify third flow file's contents
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(2);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 12 AND ID <= 24 AND ID >= 24 AND ID < 26", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be one record
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testUseColumnValuesForPartitioningNoMaxValueColumn() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+
+ // load test data to database
+ final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_QUERY_DB_TABLE");
+ } catch (final SQLException sqle) {
+ // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+ }
+
+ stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (10, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (11, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+ stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (12, NULL, 2.0, '2010-01-01 00:00:00')");
+
+ runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
+ runner.setIncomingConnection(false);
+ runner.setProperty(GenerateTableFetch.COLUMN_FOR_VALUE_PARTITIONING, "ID");
+ runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+ // First flow file
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+ String query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE 1=1 AND ID >= 10 AND ID < 12", query);
+ ResultSet resultSet = stmt.executeQuery(query);
+ // Should be two records
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ // Second flow file
+ flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
+ query = new String(flowFile.toByteArray());
+ assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 12 AND ID >= 12", query);
+ resultSet = stmt.executeQuery(query);
+ // Should be one record
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ runner.clearTransferState();
+
+ // Run again, the same flowfiles should be transferred as we have no maximum-value column
+ runner.run();
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
runner.clearTransferState();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
index 636f886..579e39e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/DerbyDatabaseAdapter.java
@@ -36,6 +36,11 @@ public class DerbyDatabaseAdapter implements DatabaseAdapter {
@Override
public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+ return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
+ }
+
+ @Override
+ public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
@@ -51,21 +56,37 @@ public class DerbyDatabaseAdapter implements DatabaseAdapter {
if (!StringUtils.isEmpty(whereClause)) {
query.append(" WHERE ");
query.append(whereClause);
+ if (!StringUtils.isEmpty(columnForPartitioning)) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" >= ");
+ query.append(offset != null ? offset : "0");
+ if (limit != null) {
+ query.append(" AND ");
+ query.append(columnForPartitioning);
+ query.append(" < ");
+ query.append((offset == null ? 0 : offset) + limit);
+ }
+ }
}
- if (!StringUtils.isEmpty(orderByClause)) {
+
+ if (!StringUtils.isEmpty(orderByClause) && StringUtils.isEmpty(columnForPartitioning)) {
query.append(" ORDER BY ");
query.append(orderByClause);
+
}
- if (offset != null && offset > 0) {
- query.append(" OFFSET ");
- query.append(offset);
- query.append(" ROWS");
- }
+ if (StringUtils.isEmpty(columnForPartitioning)) {
+ if (offset != null && offset > 0) {
+ query.append(" OFFSET ");
+ query.append(offset);
+ query.append(" ROWS");
+ }
- if (limit != null) {
- query.append(" FETCH NEXT ");
- query.append(limit);
- query.append(" ROWS ONLY");
+ if (limit != null) {
+ query.append(" FETCH NEXT ");
+ query.append(limit);
+ query.append(" ROWS ONLY");
+ }
}
return query.toString();
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java
index 1390072..912a720 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java
@@ -79,4 +79,28 @@ public class TestMSSQL2008DatabaseAdapter {
+ "WHERE methods='strange' ORDER BY contain) A WHERE rnum > 123456 AND rnum <= 133456";
Assert.assertEquals(expected3, sql);
}
+
+ @Test
+ public void testPagingQueryUsingColumnValuesForPartitioning() {
+ String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
+ 100L, 0L, "contain");
+ String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 0 AND contain < 100";
+ Assert.assertEquals(expected1, sql1);
+
+ String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
+ 10000L, 123456L, "contain");
+ String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 123456 AND contain < 133456";
+ Assert.assertEquals(expected2, sql2);
+
+ String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'",
+ "contain", 10000L, 123456L, "contain");
+ String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' AND contain >= 123456 AND contain < 133456";
+ Assert.assertEquals(expected3, sql3);
+
+ // Paging (limit/offset) is only supported when an orderByClause is supplied, note that it is not honored here
+ String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "",
+ 100L, null, "contain");
+ String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
+ Assert.assertEquals(expected4, sql4);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java
index 9c0a0ff..de33753 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java
@@ -81,4 +81,28 @@ public class TestMSSQLDatabaseAdapter {
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' ORDER BY contain OFFSET 123456 ROWS FETCH NEXT 10000 ROWS ONLY";
Assert.assertEquals(sql3,expected3);
}
+
+ @Test
+ public void testPagingQueryUsingColumnValuesForPartitioning() {
+ String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
+ 100L, 0L, "contain");
+ String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 0 AND contain < 100";
+ Assert.assertEquals(expected1, sql1);
+
+ String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
+ 10000L, 123456L, "contain");
+ String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 123456 AND contain < 133456";
+ Assert.assertEquals(expected2, sql2);
+
+ String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'",
+ "contain", 10000L, 123456L, "contain");
+ String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' AND contain >= 123456 AND contain < 133456";
+ Assert.assertEquals(expected3, sql3);
+
+ // Paging (limit/offset) is only supported when an orderByClause is supplied, note that it is not honored here
+ String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "",
+ 100L, null, "contain");
+ String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
+ Assert.assertEquals(expected4, sql4);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
index 2a80ab2..2315e98 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
@@ -26,45 +26,64 @@ public class TestOracle12DatabaseAdapter {
@Test
public void testGeneration() throws Exception {
- String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","",null,null);
+ String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "", null, null);
String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
- Assert.assertEquals(sql1,expected1);
+ Assert.assertEquals(sql1, expected1);
- String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","that=\'some\"\' value\'","",null,null);
+ String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "that=\'some\"\' value\'", "", null, null);
String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\'";
- Assert.assertEquals(sql2,expected2);
+ Assert.assertEquals(sql2, expected2);
- String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","that=\'some\"\' value\'","might DESC",null,null);
+ String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "that=\'some\"\' value\'", "might DESC", null, null);
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
- Assert.assertEquals(sql3,expected3);
+ Assert.assertEquals(sql3, expected3);
- String sql4 = db.getSelectStatement("database.tablename", "","that=\'some\"\' value\'","might DESC",null,null);
+ String sql4 = db.getSelectStatement("database.tablename", "", "that=\'some\"\' value\'", "might DESC", null, null);
String expected4 = "SELECT * FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
- Assert.assertEquals(sql4,expected4);
+ Assert.assertEquals(sql4, expected4);
}
@Test(expected = IllegalArgumentException.class)
public void testNoTableName() throws Exception {
- db.getSelectStatement("", "some(set),of(columns),that,might,contain,methods,a.*","","",null,null);
+ db.getSelectStatement("", "some(set),of(columns),that,might,contain,methods,a.*", "", "", null, null);
}
@Test
public void testPagingQuery() throws Exception {
- String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","contain",100L,0L);
+ String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "contain", 100L, 0L);
String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename ORDER BY contain FETCH NEXT 100 ROWS ONLY";
- Assert.assertEquals(sql1,expected1);
+ Assert.assertEquals(sql1, expected1);
- String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","contain",10000L,123456L);
+ String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "contain", 10000L, 123456L);
String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename ORDER BY contain OFFSET 123456 ROWS FETCH NEXT 10000 ROWS ONLY";
- Assert.assertEquals(sql2,expected2);
+ Assert.assertEquals(sql2, expected2);
- String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","methods='strange'","contain",10000L,123456L);
+ String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'", "contain", 10000L, 123456L);
String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' ORDER BY contain OFFSET 123456 ROWS FETCH NEXT 10000 ROWS ONLY";
- Assert.assertEquals(sql3,expected3);
+ Assert.assertEquals(sql3, expected3);
- String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*","","",100L,null);
+ String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "", 100L, null);
String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename FETCH NEXT 100 ROWS ONLY";
- Assert.assertEquals(sql4,expected4);
+ Assert.assertEquals(sql4, expected4);
}
+ @Test
+ public void testPagingQueryUsingColumnValuesForPartitioning() throws Exception {
+ String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain", 100L, 0L, "contain");
+ String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 0 AND contain < 100";
+ Assert.assertEquals(expected1, sql1);
+
+ String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain", 10000L, 123456L, "contain");
+ String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 123456 AND contain < 133456";
+ Assert.assertEquals(expected2, sql2);
+
+ String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'", "contain", 10000L, 123456L, "contain");
+ String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' AND contain >= 123456 AND contain < 133456";
+ Assert.assertEquals(expected3, sql3);
+
+ // Paging (limit/offset) is only supported when an orderByClause is supplied, note that it is not honored here
+ String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "", 100L, null, "contain");
+ String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
+ Assert.assertEquals(expected4, sql4);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/0e09b98b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
new file mode 100644
index 0000000..d33b03c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestOracleDatabaseAdapter {
+
+ private final DatabaseAdapter db = new OracleDatabaseAdapter();
+
+ @Test
+ public void testGeneration() {
+ String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "", null, null);
+ String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
+ Assert.assertEquals(expected1, sql1);
+
+ String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "that=\'some\"\' value\'", "", null, null);
+ String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\'";
+ Assert.assertEquals(expected2, sql2);
+
+ String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "that=\'some\"\' value\'", "might DESC", null, null);
+ String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
+ Assert.assertEquals(expected3, sql3);
+
+ String sql4 = db.getSelectStatement("database.tablename", "", "that=\'some\"\' value\'", "might DESC", null, null);
+ String expected4 = "SELECT * FROM database.tablename WHERE that=\'some\"\' value\' ORDER BY might DESC";
+ Assert.assertEquals(expected4, sql4);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNoTableName() throws IllegalArgumentException {
+ db.getSelectStatement("", "some(set),of(columns),that,might,contain,methods,a.*", "", "", null, null);
+ }
+
+ @Test
+ public void testPagingQuery() {
+ String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "contain",
+ 100L, 0L);
+ String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM (SELECT a.*, ROWNUM rnum FROM (SELECT some(set),of(columns),that,might,contain,methods,a.* "
+ + "FROM database.tablename ORDER BY contain) a WHERE ROWNUM <= 100) WHERE rnum > 0";
+ Assert.assertEquals(expected1, sql1);
+
+ String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "contain",
+ 10000L, 123456L);
+ String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM (SELECT a.*, ROWNUM rnum FROM (SELECT some(set),of(columns),that,might,contain,methods,a.* "
+ + "FROM database.tablename ORDER BY contain) a WHERE ROWNUM <= 133456) WHERE rnum > 123456";
+ Assert.assertEquals(expected2, sql2);
+
+ String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'",
+ "contain", 10000L, 123456L);
+ String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM (SELECT a.*, ROWNUM rnum FROM (SELECT some(set),of(columns),that,might,contain,methods,a.* "
+ + "FROM database.tablename WHERE methods='strange' ORDER BY contain) a WHERE ROWNUM <= 133456) WHERE rnum > 123456";
+ Assert.assertEquals(expected3, sql3);
+
+ String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "",
+ 100L, null);
+ String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM (SELECT a.*, ROWNUM rnum FROM (SELECT some(set),of(columns),that,might,contain,methods,a.* "
+ + "FROM database.tablename) a WHERE ROWNUM <= 100) WHERE rnum > 0";
+ Assert.assertEquals(expected4, sql4);
+ }
+
+ @Test
+ public void testPagingQueryUsingColumnValuesForPartitioning() {
+ String sql1 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
+ 100L, 0L, "contain");
+ String expected1 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 0 AND contain < 100";
+ Assert.assertEquals(expected1, sql1);
+
+ String sql2 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "1=1", "contain",
+ 10000L, 123456L, "contain");
+ String expected2 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE 1=1 AND contain >= 123456 AND contain < 133456";
+ Assert.assertEquals(expected2, sql2);
+
+ String sql3 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "methods='strange'",
+ "contain", 10000L, 123456L, "contain");
+ String expected3 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename WHERE methods='strange' AND contain >= 123456 AND contain < 133456";
+ Assert.assertEquals(expected3, sql3);
+
+ // Paging (limit/offset) is only supported when an orderByClause is supplied, note that it is not honored here
+ String sql4 = db.getSelectStatement("database.tablename", "some(set),of(columns),that,might,contain,methods,a.*", "", "",
+ 100L, null, "contain");
+ String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
+ Assert.assertEquals(expected4, sql4);
+ }
+}
\ No newline at end of file