You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by th...@apache.org on 2022/02/16 21:55:54 UTC
[nifi] branch main updated: NIFI-9607: Honor Update Keys when Quoting Identifiers in PutDatabaseRecord
This is an automated email from the ASF dual-hosted git repository.
thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 03165ad NIFI-9607: Honor Update Keys when Quoting Identifiers in PutDatabaseRecord
03165ad is described below
commit 03165ad817bae0c981f82e92c5b916324da5a5bf
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Fri Jan 21 16:52:11 2022 -0500
NIFI-9607: Honor Update Keys when Quoting Identifiers in PutDatabaseRecord
NIFI-9607: Fixed wrong column name in WHERE clause for generateUpdate
Signed-off-by: Nathan Gough <th...@gmail.com>
This closes #5701.
---
.../processors/standard/PutDatabaseRecord.java | 30 ++++++--------
.../standard/TestPutDatabaseRecord.groovy | 47 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 17 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index bc8a1f0..4756d80 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -603,14 +603,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
throw new IllegalArgumentException(format("Cannot process %s because Table Name is null or empty", flowFile));
}
- // Always get the primary keys if Update Keys is empty. Otherwise if we have an Insert statement first, the table will be
- // cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available
- final boolean includePrimaryKeys = updateKeys == null;
-
final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName);
final TableSchema tableSchema = schemaCache.get(schemaKey, key -> {
try {
- final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys, log);
+ final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, updateKeys, log);
getLogger().debug("Fetched Table Schema {} for table name {}", schema, tableName);
return schema;
} catch (SQLException e) {
@@ -1189,12 +1185,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
}
- // Set the WHERE clause based on the Update Key values
- sqlBuilder.append(" WHERE ");
AtomicInteger whereFieldCount = new AtomicInteger(0);
-
for (int i = 0; i < fieldCount; i++) {
-
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
@@ -1207,14 +1199,17 @@ public class PutDatabaseRecord extends AbstractProcessor {
if (whereFieldCount.getAndIncrement() > 0) {
sqlBuilder.append(" AND ");
+ } else if (i == 0) {
+ // Set the WHERE clause based on the Update Key values
+ sqlBuilder.append(" WHERE ");
}
if (settings.escapeColumnNames) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
- .append(normalizedColName)
+ .append(desc.getColumnName())
.append(tableSchema.getQuotedIdentifierString());
} else {
- sqlBuilder.append(normalizedColName);
+ sqlBuilder.append(desc.getColumnName());
}
sqlBuilder.append(" = ?");
includedColumns.add(i);
@@ -1363,10 +1358,6 @@ public class PutDatabaseRecord extends AbstractProcessor {
getLogger().warn(missingColMessage);
}
}
- // Optionally quote the name before returning
- if (settings.escapeColumnNames) {
- normalizedKeyColumnName = quoteString + normalizedKeyColumnName + quoteString;
- }
normalizedKeyColumnNames.add(normalizedKeyColumnName);
}
@@ -1419,7 +1410,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName,
- final boolean translateColumnNames, final boolean includePrimaryKeys, ComponentLog log) throws SQLException {
+ final boolean translateColumnNames, final String updateKeys, ComponentLog log) throws SQLException {
final DatabaseMetaData dmd = conn.getMetaData();
try (final ResultSet colrs = dmd.getColumns(catalog, schema, tableName, "%")) {
@@ -1455,7 +1446,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
final Set<String> primaryKeyColumns = new HashSet<>();
- if (includePrimaryKeys) {
+ if (updateKeys == null) {
try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, schema, tableName)) {
while (pkrs.next()) {
@@ -1463,6 +1454,11 @@ public class PutDatabaseRecord extends AbstractProcessor {
primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames));
}
}
+ } else {
+ // Parse the Update Keys field and normalize the column names
+ for (final String updateKey : updateKeys.split(",")) {
+ primaryKeyColumns.add(normalizeColumnName(updateKey.trim(), translateColumnNames));
+ }
}
return new TableSchema(cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString());
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index d7817ff..04f0460 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -967,6 +967,53 @@ class TestPutDatabaseRecord {
}
@Test
+ void testUpdateSpecifyQuotedUpdateKeys() throws InitializationException, ProcessException, SQLException, IOException {
+ recreateTable('CREATE TABLE PERSONS ("id" integer, name varchar(100), code integer)')
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("id", RecordFieldType.INT)
+ parser.addSchemaField("name", RecordFieldType.STRING)
+ parser.addSchemaField("code", RecordFieldType.INT)
+
+ parser.addRecord(1, 'rec1', 201)
+ parser.addRecord(2, 'rec2', 202)
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE)
+ runner.setProperty(PutDatabaseRecord.UPDATE_KEYS, '${updateKey}')
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+ runner.setProperty(PutDatabaseRecord.QUOTE_IDENTIFIERS, 'true')
+
+ // Set some existing records with different values for name and code
+ final Connection conn = dbcp.getConnection()
+ Statement stmt = conn.createStatement()
+ stmt.execute('''INSERT INTO PERSONS VALUES (1,'x1',101)''')
+ stmt.execute('''INSERT INTO PERSONS VALUES (2,'x2',102)''')
+ stmt.close()
+
+ runner.enqueue(new byte[0], ['updateKey': 'id'])
+ runner.run()
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+ stmt = conn.createStatement()
+ final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+ assertTrue(rs.next())
+ assertEquals(1, rs.getInt(1))
+ assertEquals('rec1', rs.getString(2))
+ assertEquals(201, rs.getInt(3))
+ assertTrue(rs.next())
+ assertEquals(2, rs.getInt(1))
+ assertEquals('rec2', rs.getString(2))
+ assertEquals(202, rs.getInt(3))
+ assertFalse(rs.next())
+
+ stmt.close()
+ conn.close()
+ }
+
+ @Test
void testDelete() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable(createPersons)
Connection conn = dbcp.getConnection()