You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/07/07 19:52:16 UTC
[nifi] branch main updated: NIFI-8530: Improved DELETE handling in
PutDatabaseRecord for non-nullable columns
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e16016b NIFI-8530: Improved DELETE handling in PutDatabaseRecord for non-nullable columns
e16016b is described below
commit e16016b4abc771f9e4d62ec9d8a0e626ea9dba08
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Jun 21 16:26:41 2021 -0400
NIFI-8530: Improved DELETE handling in PutDatabaseRecord for non-nullable columns
This closes #5173
Signed-off-by: David Handermann <ex...@apache.org>
---
.../processors/standard/PutDatabaseRecord.java | 32 ++++++++++++++++------
.../standard/TestPutDatabaseRecord.groovy | 20 +++++++-------
2 files changed, 34 insertions(+), 18 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 1fae8c7..4626766 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -697,6 +697,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final RecordSchema recordSchema = currentRecord.getSchema();
final Map<String, ColumnDescription> columns = tableSchema.getColumns();
+ int deleteIndex = 0;
for (int i = 0; i < fieldIndexes.size(); i++) {
final int currentFieldIndex = fieldIndexes.get(i);
Object currentValue = values[currentFieldIndex];
@@ -762,10 +763,12 @@ public class PutDatabaseRecord extends AbstractProcessor {
currentValue = DataTypeUtils.convertDateToLocalTZ((Date) currentValue);
}
- // If DELETE type, insert the object twice because of the null check (see generateDelete for details)
+ // If DELETE type, insert the object twice if the column is nullable because of the null check (see generateDelete for details)
if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
- setParameter(ps, i * 2 + 1, currentValue, fieldSqlType, sqlType);
- setParameter(ps, i * 2 + 2, currentValue, fieldSqlType, sqlType);
+ setParameter(ps, ++deleteIndex, currentValue, fieldSqlType, sqlType);
+ if (column.isNullable()) {
+ setParameter(ps, ++deleteIndex, currentValue, fieldSqlType, sqlType);
+ }
} else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
final int timesToAddObjects = databaseAdapter.getTimesToAddColumnObjectsForUpsert();
for (int j = 0; j < timesToAddObjects; j++) {
@@ -1284,9 +1287,16 @@ public class PutDatabaseRecord extends AbstractProcessor {
// (column = ? OR (column is null AND ? is null))
sqlBuilder.append("(");
sqlBuilder.append(columnName);
- sqlBuilder.append(" = ? OR (");
- sqlBuilder.append(columnName);
- sqlBuilder.append(" is null AND ? is null))");
+ sqlBuilder.append(" = ?");
+
+ // Only need null check if the column is nullable, otherwise the row wouldn't exist
+ if (desc.isNullable()) {
+ sqlBuilder.append(" OR (");
+ sqlBuilder.append(columnName);
+ sqlBuilder.append(" is null AND ? is null))");
+ } else {
+ sqlBuilder.append(")");
+ }
includedColumns.add(i);
} else {
// User is ignoring unmapped fields, but log at debug level just in case
@@ -1476,12 +1486,14 @@ public class PutDatabaseRecord extends AbstractProcessor {
private final int dataType;
private final boolean required;
private final Integer columnSize;
+ private final boolean nullable;
- public ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) {
+ public ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize, final boolean nullable) {
this.columnName = columnName;
this.dataType = dataType;
this.required = required;
this.columnSize = columnSize;
+ this.nullable = nullable;
}
public int getDataType() {
@@ -1500,6 +1512,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
return required;
}
+ public boolean isNullable() {
+ return nullable;
+ }
+
public static ColumnDescription from(final ResultSet resultSet) throws SQLException {
final ResultSetMetaData md = resultSet.getMetaData();
List<String> columns = new ArrayList<>();
@@ -1524,7 +1540,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue);
final boolean required = !isNullable && !isAutoIncrement && defaultValue == null;
- return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize);
+ return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize, isNullable);
}
@Override
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index d3ad032..623e8d1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -145,9 +145,9 @@ class TestPutDatabaseRecord {
def tableSchema = [
[
- new PutDatabaseRecord.ColumnDescription('id', 4, true, 2),
- new PutDatabaseRecord.ColumnDescription('name', 12, true, 255),
- new PutDatabaseRecord.ColumnDescription('code', 4, true, 10)
+ new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false),
+ new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true),
+ new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true)
],
false,
['id'] as Set<String>,
@@ -169,7 +169,7 @@ class TestPutDatabaseRecord {
assertEquals('UPDATE PERSONS SET name = ?, code = ? WHERE id = ?',
generateUpdate(schema, 'PERSONS', null, tableSchema, settings).sql)
- assertEquals('DELETE FROM PERSONS WHERE (id = ? OR (id is null AND ? is null)) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))',
+ assertEquals('DELETE FROM PERSONS WHERE (id = ?) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))',
generateDelete(schema, 'PERSONS', tableSchema, settings).sql)
}
}
@@ -193,9 +193,9 @@ class TestPutDatabaseRecord {
def tableSchema = [
[
- new PutDatabaseRecord.ColumnDescription('id', 4, true, 2),
- new PutDatabaseRecord.ColumnDescription('name', 12, true, 255),
- new PutDatabaseRecord.ColumnDescription('code', 4, true, 10)
+ new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false),
+ new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true),
+ new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true)
],
false,
['id'] as Set<String>,
@@ -1236,9 +1236,9 @@ class TestPutDatabaseRecord {
def tableSchema = [
[
- new PutDatabaseRecord.ColumnDescription('id', 4, true, 2),
- new PutDatabaseRecord.ColumnDescription('name', 12, true, 255),
- new PutDatabaseRecord.ColumnDescription('code', 4, true, 10)
+ new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false),
+ new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true),
+ new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true)
],
false,
['id'] as Set<String>,