You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/06/16 20:05:51 UTC
[nifi] branch main updated: NIFI-11682 Correct Quoting for UPSERT and INSERT_IGNORE keys in PutDatabaseRecord
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new bc6ac4b700 NIFI-11682 Correct Quoting for UPSERT and INSERT_IGNORE keys in PutDatabaseRecord
bc6ac4b700 is described below
commit bc6ac4b700c78961045c8a55b44406827d1a8693
Author: Matt Burgess <ma...@apache.org>
AuthorDate: Thu Jun 15 14:33:53 2023 -0400
NIFI-11682 Correct Quoting for UPSERT and INSERT_IGNORE keys in PutDatabaseRecord
This closes #7385
Signed-off-by: David Handermann <ex...@apache.org>
---
.../processors/standard/PutDatabaseRecord.java | 29 +++++++++++++++++-----
.../nifi/processors/standard/db/TableSchema.java | 2 +-
.../processors/standard/PutDatabaseRecordTest.java | 2 +-
3 files changed, 25 insertions(+), 8 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 671e564d8c..02f36b53cf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -1112,7 +1112,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema);
- Set<String> normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames, tableSchema.getQuotedIdentifierString());
+ normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames);
List<String> usedColumnNames = new ArrayList<>();
List<Integer> usedColumnIndices = new ArrayList<>();
@@ -1146,7 +1146,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
}
- String sql = databaseAdapter.getUpsertStatement(tableName, usedColumnNames, normalizedKeyColumnNames);
+ final Set<String> literalKeyColumnNames = new HashSet<>(keyColumnNames.size());
+ for (String literalKeyColumnName : keyColumnNames) {
+ if (settings.escapeColumnNames) {
+ literalKeyColumnNames.add(tableSchema.getQuotedIdentifierString() + literalKeyColumnName + tableSchema.getQuotedIdentifierString());
+ } else {
+ literalKeyColumnNames.add(literalKeyColumnName);
+ }
+ }
+ String sql = databaseAdapter.getUpsertStatement(tableName, usedColumnNames, literalKeyColumnNames);
return new SqlAndIncludedColumns(sql, usedColumnIndices);
}
@@ -1158,7 +1166,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
checkValuesForRequiredColumns(recordSchema, tableSchema, settings);
Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema);
- Set<String> normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames, tableSchema.getQuotedIdentifierString());
+ normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames);
List<String> usedColumnNames = new ArrayList<>();
List<Integer> usedColumnIndices = new ArrayList<>();
@@ -1192,7 +1200,16 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
}
- String sql = databaseAdapter.getInsertIgnoreStatement(tableName, usedColumnNames, normalizedKeyColumnNames);
+ final Set<String> literalKeyColumnNames = new HashSet<>(keyColumnNames.size());
+ for (String literalKeyColumnName : keyColumnNames) {
+ if (settings.escapeColumnNames) {
+ literalKeyColumnNames.add(tableSchema.getQuotedIdentifierString() + literalKeyColumnName + tableSchema.getQuotedIdentifierString());
+ } else {
+ literalKeyColumnNames.add(literalKeyColumnName);
+ }
+ }
+
+ String sql = databaseAdapter.getInsertIgnoreStatement(tableName, usedColumnNames, literalKeyColumnNames);
return new SqlAndIncludedColumns(sql, usedColumnIndices);
}
@@ -1202,7 +1219,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
throws IllegalArgumentException, MalformedRecordException, SQLException {
final Set<String> keyColumnNames = getUpdateKeyColumnNames(tableName, updateKeys, tableSchema);
- final Set<String> normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames, tableSchema.getQuotedIdentifierString());
+ final Set<String> normalizedKeyColumnNames = normalizeKeyColumnNamesAndCheckForValues(recordSchema, updateKeys, settings, keyColumnNames);
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("UPDATE ");
@@ -1411,7 +1428,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
return updateKeyColumnNames;
}
- private Set<String> normalizeKeyColumnNamesAndCheckForValues(RecordSchema recordSchema, String updateKeys, DMLSettings settings, Set<String> updateKeyColumnNames, final String quoteString)
+ private Set<String> normalizeKeyColumnNamesAndCheckForValues(RecordSchema recordSchema, String updateKeys, DMLSettings settings, Set<String> updateKeyColumnNames)
throws MalformedRecordException {
// Create a Set of all normalized Update Key names, and ensure that there is a field in the record
// for each of the Update Key fields.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
index 70415dd934..b7429ca776 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/TableSchema.java
@@ -118,7 +118,7 @@ public class TableSchema {
while (pkrs.next()) {
final String colName = pkrs.getString("COLUMN_NAME");
- primaryKeyColumns.add(ColumnDescription.normalizeColumnName(colName, translateColumnNames));
+ primaryKeyColumns.add(colName);
}
}
} else {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index 903ba2fc92..e3c6600dc6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -1113,7 +1113,7 @@ public class PutDatabaseRecordTest {
@Test
void testUpdateSpecifyQuotedUpdateKeys() throws InitializationException, ProcessException, SQLException {
- recreateTable("CREATE TABLE PERSONS (\"id\" integer, name varchar(100), code integer)");
+ recreateTable("CREATE TABLE PERSONS (\"id\" integer, \"name\" varchar(100), \"code\" integer)");
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);