You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/06/09 20:03:49 UTC

[nifi] branch master updated: NIFI-7313:fix bug on 'Quote Table Identifiers' NIFI-7313:add test by wanghongqi NIFI-7313:edit test

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c757b6  NIFI-7313:fix bug on 'Quote Table Identifiers' NIFI-7313:add test by wanghongqi NIFI-7313:edit test
3c757b6 is described below

commit 3c757b6ba8518160bb4e07130d4f92c7b3b6c124
Author: wanghq-c <wa...@glodon.com>
AuthorDate: Fri Apr 3 16:47:31 2020 +0800

    NIFI-7313:fix bug on 'Quote Table Identifiers'
    NIFI-7313:add test by wanghongqi
    NIFI-7313:edit test
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #4185
---
 .../processors/standard/PutDatabaseRecord.java     | 67 ++++++++++++----------
 .../standard/TestPutDatabaseRecord.groovy          | 43 ++++++++++++++
 2 files changed, 80 insertions(+), 30 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 55adabb..926c5cd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -640,15 +640,8 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
         }
 
         // build the fully qualified table name
-        final StringBuilder tableNameBuilder = new StringBuilder();
-        if (catalog != null) {
-            tableNameBuilder.append(catalog).append(".");
-        }
-        if (schemaName != null) {
-            tableNameBuilder.append(schemaName).append(".");
-        }
-        tableNameBuilder.append(tableName);
-        final String fqTableName = tableNameBuilder.toString();
+
+        final String fqTableName =  generateTableName(settings, catalog, schemaName, tableName, tableSchema);
 
         if (recordSchema == null) {
             throw new IllegalArgumentException("No record schema specified!");
@@ -754,6 +747,38 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
 
     }
 
+    private String generateTableName(DMLSettings settings, String catalog, String schemaName, String tableName, TableSchema tableSchema) {
+        final StringBuilder tableNameBuilder = new StringBuilder();
+        if (catalog != null) {
+            if (settings.quoteTableName) {
+                tableNameBuilder.append(tableSchema.getQuotedIdentifierString())
+                        .append(catalog)
+                        .append(tableSchema.getQuotedIdentifierString());
+            } else {
+                tableNameBuilder.append(catalog);
+            }
+            tableNameBuilder.append(".");
+        }
+        if (schemaName != null) {
+            if (settings.quoteTableName) {
+                tableNameBuilder.append(tableSchema.getQuotedIdentifierString())
+                        .append(schemaName)
+                        .append(tableSchema.getQuotedIdentifierString());
+            } else {
+                tableNameBuilder.append(schemaName);
+            }
+            tableNameBuilder.append(".");
+        }
+        if (settings.quoteTableName) {
+            tableNameBuilder.append(tableSchema.getQuotedIdentifierString())
+                    .append(tableName)
+                    .append(tableSchema.getQuotedIdentifierString());
+        } else {
+            tableNameBuilder.append(tableName);
+        }
+        return tableNameBuilder.toString();
+    }
+
     private Set<String> getNormalizedColumnNames(final RecordSchema schema, final boolean translateFieldNames) {
         final Set<String> normalizedFieldNames = new HashSet<>();
         if (schema != null) {
@@ -782,13 +807,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
 
         final StringBuilder sqlBuilder = new StringBuilder();
         sqlBuilder.append("INSERT INTO ");
-        if (settings.quoteTableName) {
-            sqlBuilder.append(tableSchema.getQuotedIdentifierString())
-                    .append(tableName)
-                    .append(tableSchema.getQuotedIdentifierString());
-        } else {
-            sqlBuilder.append(tableName);
-        }
+        sqlBuilder.append(tableName);
         sqlBuilder.append(" (");
 
         // iterate over all of the fields in the record, building the SQL statement by adding the column names
@@ -855,13 +874,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
 
         final StringBuilder sqlBuilder = new StringBuilder();
         sqlBuilder.append("UPDATE ");
-        if (settings.quoteTableName) {
-            sqlBuilder.append(tableSchema.getQuotedIdentifierString())
-                    .append(tableName)
-                    .append(tableSchema.getQuotedIdentifierString());
-        } else {
-            sqlBuilder.append(tableName);
-        }
+        sqlBuilder.append(tableName);
 
         // Create a Set of all normalized Update Key names, and ensure that there is a field in the record
         // for each of the Update Key fields.
@@ -980,13 +993,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
 
         final StringBuilder sqlBuilder = new StringBuilder();
         sqlBuilder.append("DELETE FROM ");
-        if (settings.quoteTableName) {
-            sqlBuilder.append(tableSchema.getQuotedIdentifierString())
-                    .append(tableName)
-                    .append(tableSchema.getQuotedIdentifierString());
-        } else {
-            sqlBuilder.append(tableName);
-        }
+        sqlBuilder.append(tableName);
 
         // iterate over all of the fields in the record, building the SQL statement by adding the column names
         List<String> fieldNames = recordSchema.getFieldNames();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 080502c..24e68f8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -915,4 +915,47 @@ class TestPutDatabaseRecord {
         stmt.close()
         conn.close()
     }
+    @Test
+    void testGenerateTableName() throws Exception {
+
+        final List<RecordField> fields = [new RecordField('id', RecordFieldType.INT.dataType),
+                                          new RecordField('name', RecordFieldType.STRING.dataType),
+                                          new RecordField('code', RecordFieldType.INT.dataType),
+                                          new RecordField('non_existing', RecordFieldType.BOOLEAN.dataType)]
+
+        def schema = [
+                getFields    : {fields},
+                getFieldCount: {fields.size()},
+                getField     : {int index -> fields[index]},
+                getDataTypes : {fields.collect {it.dataType}},
+                getFieldNames: {fields.collect {it.fieldName}},
+                getDataType  : {fieldName -> fields.find {it.fieldName == fieldName}.dataType}
+        ] as RecordSchema
+
+        def tableSchema = [
+                [
+                        new PutDatabaseRecord.ColumnDescription('id', 4, true, 2),
+                        new PutDatabaseRecord.ColumnDescription('name', 12, true, 255),
+                        new PutDatabaseRecord.ColumnDescription('code', 4, true, 10)
+                ],
+                false,
+                ['id'] as Set<String>,
+                '"'
+
+        ] as PutDatabaseRecord.TableSchema
+
+        runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false')
+        runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_FIELD)
+        runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_COLUMN)
+        runner.setProperty(PutDatabaseRecord.QUOTED_IDENTIFIERS, 'true')
+        runner.setProperty(PutDatabaseRecord.QUOTED_TABLE_IDENTIFIER, 'true')
+        def settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext())
+
+        processor.with {
+
+            assertEquals('"test_catalog"."test_schema"."test_table"',
+                    generateTableName(settings,"test_catalog","test_schema","test_table",tableSchema))
+
+        }
+    }
 }