You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by th...@apache.org on 2022/02/16 21:55:54 UTC

[nifi] branch main updated: NIFI-9607: Honor Update Keys when Quoting Identifiers in PutDatabaseRecord

This is an automated email from the ASF dual-hosted git repository.

thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 03165ad  NIFI-9607: Honor Update Keys when Quoting Identifiers in PutDatabaseRecord
03165ad is described below

commit 03165ad817bae0c981f82e92c5b916324da5a5bf
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Fri Jan 21 16:52:11 2022 -0500

    NIFI-9607: Honor Update Keys when Quoting Identifiers in PutDatabaseRecord
    
    NIFI-9607: Fixed wrong column name in WHERE clause for generateUpdate
    
    Signed-off-by: Nathan Gough <th...@gmail.com>
    
    This closes #5701.
---
 .../processors/standard/PutDatabaseRecord.java     | 30 ++++++--------
 .../standard/TestPutDatabaseRecord.groovy          | 47 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 17 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index bc8a1f0..4756d80 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -603,14 +603,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
             throw new IllegalArgumentException(format("Cannot process %s because Table Name is null or empty", flowFile));
         }
 
-        // Always get the primary keys if Update Keys is empty. Otherwise if we have an Insert statement first, the table will be
-        // cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available
-        final boolean includePrimaryKeys = updateKeys == null;
-
         final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName);
         final TableSchema tableSchema = schemaCache.get(schemaKey, key -> {
             try {
-                final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys, log);
+                final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, updateKeys, log);
                 getLogger().debug("Fetched Table Schema {} for table name {}", schema, tableName);
                 return schema;
             } catch (SQLException e) {
@@ -1189,12 +1185,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 }
             }
 
-            // Set the WHERE clause based on the Update Key values
-            sqlBuilder.append(" WHERE ");
             AtomicInteger whereFieldCount = new AtomicInteger(0);
-
             for (int i = 0; i < fieldCount; i++) {
-
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
@@ -1207,14 +1199,17 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
                         if (whereFieldCount.getAndIncrement() > 0) {
                             sqlBuilder.append(" AND ");
+                        } else if (i == 0) {
+                            // Set the WHERE clause based on the Update Key values
+                            sqlBuilder.append(" WHERE ");
                         }
 
                         if (settings.escapeColumnNames) {
                             sqlBuilder.append(tableSchema.getQuotedIdentifierString())
-                                    .append(normalizedColName)
+                                    .append(desc.getColumnName())
                                     .append(tableSchema.getQuotedIdentifierString());
                         } else {
-                            sqlBuilder.append(normalizedColName);
+                            sqlBuilder.append(desc.getColumnName());
                         }
                         sqlBuilder.append(" = ?");
                         includedColumns.add(i);
@@ -1363,10 +1358,6 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     getLogger().warn(missingColMessage);
                 }
             }
-            // Optionally quote the name before returning
-            if (settings.escapeColumnNames) {
-                normalizedKeyColumnName = quoteString + normalizedKeyColumnName + quoteString;
-            }
             normalizedKeyColumnNames.add(normalizedKeyColumnName);
         }
 
@@ -1419,7 +1410,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         }
 
         public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName,
-                                       final boolean translateColumnNames, final boolean includePrimaryKeys, ComponentLog log) throws SQLException {
+                                       final boolean translateColumnNames, final String updateKeys, ComponentLog log) throws SQLException {
             final DatabaseMetaData dmd = conn.getMetaData();
 
             try (final ResultSet colrs = dmd.getColumns(catalog, schema, tableName, "%")) {
@@ -1455,7 +1446,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 }
 
                 final Set<String> primaryKeyColumns = new HashSet<>();
-                if (includePrimaryKeys) {
+                if (updateKeys == null) {
                     try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, schema, tableName)) {
 
                         while (pkrs.next()) {
@@ -1463,6 +1454,11 @@ public class PutDatabaseRecord extends AbstractProcessor {
                             primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames));
                         }
                     }
+                } else {
+                    // Parse the Update Keys field and normalize the column names
+                    for (final String updateKey : updateKeys.split(",")) {
+                        primaryKeyColumns.add(normalizeColumnName(updateKey.trim(), translateColumnNames));
+                    }
                 }
 
                 return new TableSchema(cols, translateColumnNames, primaryKeyColumns, dmd.getIdentifierQuoteString());
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index d7817ff..04f0460 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -967,6 +967,53 @@ class TestPutDatabaseRecord {
     }
 
     @Test
+    void testUpdateSpecifyQuotedUpdateKeys() throws InitializationException, ProcessException, SQLException, IOException {
+        recreateTable('CREATE TABLE PERSONS ("id" integer, name varchar(100), code integer)')
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+
+        parser.addRecord(1, 'rec1', 201)
+        parser.addRecord(2, 'rec2', 202)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE)
+        runner.setProperty(PutDatabaseRecord.UPDATE_KEYS, '${updateKey}')
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+        runner.setProperty(PutDatabaseRecord.QUOTE_IDENTIFIERS, 'true')
+
+        // Set some existing records with different values for name and code
+        final Connection conn = dbcp.getConnection()
+        Statement stmt = conn.createStatement()
+        stmt.execute('''INSERT INTO PERSONS VALUES (1,'x1',101)''')
+        stmt.execute('''INSERT INTO PERSONS VALUES (2,'x2',102)''')
+        stmt.close()
+
+        runner.enqueue(new byte[0], ['updateKey': 'id'])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        stmt = conn.createStatement()
+        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        assertEquals('rec1', rs.getString(2))
+        assertEquals(201, rs.getInt(3))
+        assertTrue(rs.next())
+        assertEquals(2, rs.getInt(1))
+        assertEquals('rec2', rs.getString(2))
+        assertEquals(202, rs.getInt(3))
+        assertFalse(rs.next())
+
+        stmt.close()
+        conn.close()
+    }
+
+    @Test
     void testDelete() throws InitializationException, ProcessException, SQLException, IOException {
         recreateTable(createPersons)
         Connection conn = dbcp.getConnection()