You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/06/16 20:05:51 UTC

[nifi] branch main updated: NIFI-11682 Correct Quoting for UPSERT and INSERT_IGNORE keys in PutDatabaseRecord

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new bc6ac4b700 NIFI-11682 Correct Quoting for UPSERT and INSERT_IGNORE keys in PutDatabaseRecord
bc6ac4b700 is described below

commit bc6ac4b700c78961045c8a55b44406827d1a8693
Author: Matt Burgess <ma...@apache.org>
AuthorDate: Thu Jun 15 14:33:53 2023 -0400

    NIFI-11682 Correct Quoting for UPSERT and INSERT_IGNORE keys in PutDatabaseRecord
    
    This closes #7385
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../processors/standard/PutDatabaseRecord.java     | 29 +++++++++++++++++-----
 .../nifi/processors/standard/db/TableSchema.java   |  2 +-
 .../processors/standard/PutDatabaseRecordTest.java |  2 +-
 3 files changed, 25 insertions(+), 8 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 671e564d8c..02f36b53cf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -1112,7 +1112,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
 
         Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema);
-        Set<String> normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames, tableSchema.getQuotedIdentifierString());
+        normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames);
 
         List<String> usedColumnNames = new ArrayList<>();
         List<Integer> usedColumnIndices = new ArrayList<>();
@@ -1146,7 +1146,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
             }
         }
 
-        String sql = databaseAdapter.getUpsertStatement(tableName, usedColumnNames, normalizedKeyColumnNames);
+        final Set<String> literalKeyColumnNames = new HashSet<>(keyColumnNames.size());
+        for (String literalKeyColumnName : keyColumnNames) {
+            if (settings.escapeColumnNames) {
+                literalKeyColumnNames.add(tableSchema.getQuotedIdentifierString() + literalKeyColumnName + tableSchema.getQuotedIdentifierString());
+            } else {
+                literalKeyColumnNames.add(literalKeyColumnName);
+            }
+        }
+        String sql = databaseAdapter.getUpsertStatement(tableName, usedColumnNames, literalKeyColumnNames);
 
         return new SqlAndIncludedColumns(sql, usedColumnIndices);
     }
@@ -1158,7 +1166,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
 
         Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema);
-        Set<String> normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames, tableSchema.getQuotedIdentifierString());
+        normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames);
 
         List<String> usedColumnNames = new ArrayList<>();
         List<Integer> usedColumnIndices = new ArrayList<>();
@@ -1192,7 +1200,16 @@ public class PutDatabaseRecord extends AbstractProcessor {
             }
         }
 
-        String sql = databaseAdapter.getInsertIgnoreStatement(tableName, usedColumnNames, normalizedKeyColumnNames);
+        final Set<String> literalKeyColumnNames = new HashSet<>(keyColumnNames.size());
+        for (String literalKeyColumnName : keyColumnNames) {
+            if (settings.escapeColumnNames) {
+                literalKeyColumnNames.add(tableSchema.getQuotedIdentifierString() + literalKeyColumnName + tableSchema.getQuotedIdentifierString());
+            } else {
+                literalKeyColumnNames.add(literalKeyColumnName);
+            }
+        }
+
+        String sql = databaseAdapter.getInsertIgnoreStatement(tableName, usedColumnNames, literalKeyColumnNames);
 
         return new SqlAndIncludedColumns(sql, usedColumnIndices);
     }
@@ -1202,7 +1219,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
             throws IllegalArgumentException, MalformedRecordException, SQLException {
 
         final Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema);
-        final Set<String> normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames, tableSchema.getQuotedIdentifierString());
+        final Set<String> normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames);
 
         final StringBuilder sqlBuilder = new StringBuilder();
         sqlBuilder.append("UPDATE ");
@@ -1411,7 +1428,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         return updateKeyColumnNames;
     }
 
-    private Set<String> normalizeKeyColumnNamesAndCheckForValues(RecordSchema recordSchema, String updateKeys, DMLSettings settings, Set<String> updateKeyColumnNames, final String quoteString)
+    private Set<String> normalizeKeyColumnNamesAndCheckForValues(RecordSchema recordSchema, String updateKeys, DMLSettings settings, Set<String> updateKeyColumnNames)
             throws MalformedRecordException {
         // Create a Set of all normalized Update Key names, and ensure that there is a field in the record
         // for each of the Update Key fields.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
index 70415dd934..b7429ca776 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
@@ -118,7 +118,7 @@ public class TableSchema {
 
                     while (pkrs.next()) {
                         final String colName = pkrs.getString("COLUMN_NAME");
-                        primaryKeyColumns.add(ColumnDescription.normalizeColumnName(colName, translateColumnNames));
+                        primaryKeyColumns.add(colName);
                     }
                 }
             } else {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index 903ba2fc92..e3c6600dc6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -1113,7 +1113,7 @@ public class PutDatabaseRecordTest {
 
     @Test
     void testUpdateSpecifyQuotedUpdateKeys() throws InitializationException, ProcessException, SQLException {
-        recreateTable("CREATE TABLE PERSONS (\"id\" integer, name varchar(100), code integer)");
+        recreateTable("CREATE TABLE PERSONS (\"id\" integer, \"name\" varchar(100), \"code\" integer)");
         final MockRecordParser parser = new MockRecordParser();
         runner.addControllerService("parser", parser);
         runner.enableControllerService(parser);