You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/07/07 19:52:16 UTC

[nifi] branch main updated: NIFI-8530: Improved DELETE handling in PutDatabaseRecord for non-nullable columns

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e16016b  NIFI-8530: Improved DELETE handling in PutDatabaseRecord for non-nullable columns
e16016b is described below

commit e16016b4abc771f9e4d62ec9d8a0e626ea9dba08
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Jun 21 16:26:41 2021 -0400

    NIFI-8530: Improved DELETE handling in PutDatabaseRecord for non-nullable columns
    
    This closes #5173
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../processors/standard/PutDatabaseRecord.java     | 32 ++++++++++++++++------
 .../standard/TestPutDatabaseRecord.groovy          | 20 +++++++-------
 2 files changed, 34 insertions(+), 18 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 1fae8c7..4626766 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -697,6 +697,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     final RecordSchema recordSchema = currentRecord.getSchema();
                     final Map<String, ColumnDescription> columns = tableSchema.getColumns();
 
+                    int deleteIndex = 0;
                     for (int i = 0; i < fieldIndexes.size(); i++) {
                         final int currentFieldIndex = fieldIndexes.get(i);
                         Object currentValue = values[currentFieldIndex];
@@ -762,10 +763,12 @@ public class PutDatabaseRecord extends AbstractProcessor {
                             currentValue = DataTypeUtils.convertDateToLocalTZ((Date) currentValue);
                         }
 
-                        // If DELETE type, insert the object twice because of the null check (see generateDelete for details)
+                        // If DELETE type, insert the object twice if the column is nullable because of the null check (see generateDelete for details)
                         if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
-                            setParameter(ps, i * 2 + 1, currentValue, fieldSqlType, sqlType);
-                            setParameter(ps, i * 2 + 2, currentValue, fieldSqlType, sqlType);
+                            setParameter(ps, ++deleteIndex, currentValue, fieldSqlType, sqlType);
+                            if (column.isNullable()) {
+                                setParameter(ps, ++deleteIndex, currentValue, fieldSqlType, sqlType);
+                            }
                         } else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
                             final int timesToAddObjects = databaseAdapter.getTimesToAddColumnObjectsForUpsert();
                             for (int j = 0; j < timesToAddObjects; j++) {
@@ -1284,9 +1287,16 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     //   (column = ? OR (column is null AND ? is null))
                     sqlBuilder.append("(");
                     sqlBuilder.append(columnName);
-                    sqlBuilder.append(" = ? OR (");
-                    sqlBuilder.append(columnName);
-                    sqlBuilder.append(" is null AND ? is null))");
+                    sqlBuilder.append(" = ?");
+
+                    // Only need null check if the column is nullable, otherwise the row wouldn't exist
+                    if (desc.isNullable()) {
+                        sqlBuilder.append(" OR (");
+                        sqlBuilder.append(columnName);
+                        sqlBuilder.append(" is null AND ? is null))");
+                    } else {
+                        sqlBuilder.append(")");
+                    }
                     includedColumns.add(i);
                 } else {
                     // User is ignoring unmapped fields, but log at debug level just in case
@@ -1476,12 +1486,14 @@ public class PutDatabaseRecord extends AbstractProcessor {
         private final int dataType;
         private final boolean required;
         private final Integer columnSize;
+        private final boolean nullable;
 
-        public ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) {
+        public ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize, final boolean nullable) {
             this.columnName = columnName;
             this.dataType = dataType;
             this.required = required;
             this.columnSize = columnSize;
+            this.nullable = nullable;
         }
 
         public int getDataType() {
@@ -1500,6 +1512,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
             return required;
         }
 
+        public boolean isNullable() {
+            return nullable;
+        }
+
         public static ColumnDescription from(final ResultSet resultSet) throws SQLException {
             final ResultSetMetaData md = resultSet.getMetaData();
             List<String> columns = new ArrayList<>();
@@ -1524,7 +1540,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
             final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue);
             final boolean required = !isNullable && !isAutoIncrement && defaultValue == null;
 
-            return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize);
+            return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize, isNullable);
         }
 
         @Override
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index d3ad032..623e8d1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -145,9 +145,9 @@ class TestPutDatabaseRecord {
 
         def tableSchema = [
                 [
-                        new PutDatabaseRecord.ColumnDescription('id', 4, true, 2),
-                        new PutDatabaseRecord.ColumnDescription('name', 12, true, 255),
-                        new PutDatabaseRecord.ColumnDescription('code', 4, true, 10)
+                        new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false),
+                        new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true),
+                        new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true)
                 ],
                 false,
                 ['id'] as Set<String>,
@@ -169,7 +169,7 @@ class TestPutDatabaseRecord {
             assertEquals('UPDATE PERSONS SET name = ?, code = ? WHERE id = ?',
                     generateUpdate(schema, 'PERSONS', null, tableSchema, settings).sql)
 
-            assertEquals('DELETE FROM PERSONS WHERE (id = ? OR (id is null AND ? is null)) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))',
+            assertEquals('DELETE FROM PERSONS WHERE (id = ?) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))',
                     generateDelete(schema, 'PERSONS', tableSchema, settings).sql)
         }
     }
@@ -193,9 +193,9 @@ class TestPutDatabaseRecord {
 
         def tableSchema = [
                 [
-                        new PutDatabaseRecord.ColumnDescription('id', 4, true, 2),
-                        new PutDatabaseRecord.ColumnDescription('name', 12, true, 255),
-                        new PutDatabaseRecord.ColumnDescription('code', 4, true, 10)
+                        new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false),
+                        new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true),
+                        new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true)
                 ],
                 false,
                 ['id'] as Set<String>,
@@ -1236,9 +1236,9 @@ class TestPutDatabaseRecord {
 
         def tableSchema = [
                 [
-                        new PutDatabaseRecord.ColumnDescription('id', 4, true, 2),
-                        new PutDatabaseRecord.ColumnDescription('name', 12, true, 255),
-                        new PutDatabaseRecord.ColumnDescription('code', 4, true, 10)
+                        new PutDatabaseRecord.ColumnDescription('id', 4, true, 2, false),
+                        new PutDatabaseRecord.ColumnDescription('name', 12, true, 255, true),
+                        new PutDatabaseRecord.ColumnDescription('code', 4, true, 10, true)
                 ],
                 false,
                 ['id'] as Set<String>,