You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/06/09 20:03:49 UTC
[nifi] branch master updated: NIFI-7313:fix bug on 'Quote Table
Identifiers' NIFI-7313:add test by wanghongqi NIFI-7313:edit test
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 3c757b6 NIFI-7313:fix bug on 'Quote Table Identifiers' NIFI-7313:add test by wanghongqi NIFI-7313:edit test
3c757b6 is described below
commit 3c757b6ba8518160bb4e07130d4f92c7b3b6c124
Author: wanghq-c <wa...@glodon.com>
AuthorDate: Fri Apr 3 16:47:31 2020 +0800
NIFI-7313:fix bug on 'Quote Table Identifiers'
NIFI-7313:add test by wanghongqi
NIFI-7313:edit test
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #4185
---
.../processors/standard/PutDatabaseRecord.java | 67 ++++++++++++----------
.../standard/TestPutDatabaseRecord.groovy | 43 ++++++++++++++
2 files changed, 80 insertions(+), 30 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 55adabb..926c5cd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -640,15 +640,8 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
}
// build the fully qualified table name
- final StringBuilder tableNameBuilder = new StringBuilder();
- if (catalog != null) {
- tableNameBuilder.append(catalog).append(".");
- }
- if (schemaName != null) {
- tableNameBuilder.append(schemaName).append(".");
- }
- tableNameBuilder.append(tableName);
- final String fqTableName = tableNameBuilder.toString();
+
+ final String fqTableName = generateTableName(settings, catalog, schemaName, tableName, tableSchema);
if (recordSchema == null) {
throw new IllegalArgumentException("No record schema specified!");
@@ -754,6 +747,38 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
}
+ private String generateTableName(DMLSettings settings, String catalog, String schemaName, String tableName, TableSchema tableSchema) {
+ final StringBuilder tableNameBuilder = new StringBuilder();
+ if (catalog != null) {
+ if (settings.quoteTableName) {
+ tableNameBuilder.append(tableSchema.getQuotedIdentifierString())
+ .append(catalog)
+ .append(tableSchema.getQuotedIdentifierString());
+ } else {
+ tableNameBuilder.append(catalog);
+ }
+ tableNameBuilder.append(".");
+ }
+ if (schemaName != null) {
+ if (settings.quoteTableName) {
+ tableNameBuilder.append(tableSchema.getQuotedIdentifierString())
+ .append(schemaName)
+ .append(tableSchema.getQuotedIdentifierString());
+ } else {
+ tableNameBuilder.append(schemaName);
+ }
+ tableNameBuilder.append(".");
+ }
+ if (settings.quoteTableName) {
+ tableNameBuilder.append(tableSchema.getQuotedIdentifierString())
+ .append(tableName)
+ .append(tableSchema.getQuotedIdentifierString());
+ } else {
+ tableNameBuilder.append(tableName);
+ }
+ return tableNameBuilder.toString();
+ }
+
private Set<String> getNormalizedColumnNames(final RecordSchema schema, final boolean translateFieldNames) {
final Set<String> normalizedFieldNames = new HashSet<>();
if (schema != null) {
@@ -782,13 +807,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("INSERT INTO ");
- if (settings.quoteTableName) {
- sqlBuilder.append(tableSchema.getQuotedIdentifierString())
- .append(tableName)
- .append(tableSchema.getQuotedIdentifierString());
- } else {
- sqlBuilder.append(tableName);
- }
+ sqlBuilder.append(tableName);
sqlBuilder.append(" (");
// iterate over all of the fields in the record, building the SQL statement by adding the column names
@@ -855,13 +874,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("UPDATE ");
- if (settings.quoteTableName) {
- sqlBuilder.append(tableSchema.getQuotedIdentifierString())
- .append(tableName)
- .append(tableSchema.getQuotedIdentifierString());
- } else {
- sqlBuilder.append(tableName);
- }
+ sqlBuilder.append(tableName);
// Create a Set of all normalized Update Key names, and ensure that there is a field in the record
// for each of the Update Key fields.
@@ -980,13 +993,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("DELETE FROM ");
- if (settings.quoteTableName) {
- sqlBuilder.append(tableSchema.getQuotedIdentifierString())
- .append(tableName)
- .append(tableSchema.getQuotedIdentifierString());
- } else {
- sqlBuilder.append(tableName);
- }
+ sqlBuilder.append(tableName);
// iterate over all of the fields in the record, building the SQL statement by adding the column names
List<String> fieldNames = recordSchema.getFieldNames();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 080502c..24e68f8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -915,4 +915,47 @@ class TestPutDatabaseRecord {
stmt.close()
conn.close()
}
+ @Test
+ void testGenerateTableName() throws Exception {
+
+ final List<RecordField> fields = [new RecordField('id', RecordFieldType.INT.dataType),
+ new RecordField('name', RecordFieldType.STRING.dataType),
+ new RecordField('code', RecordFieldType.INT.dataType),
+ new RecordField('non_existing', RecordFieldType.BOOLEAN.dataType)]
+
+ def schema = [
+ getFields : {fields},
+ getFieldCount: {fields.size()},
+ getField : {int index -> fields[index]},
+ getDataTypes : {fields.collect {it.dataType}},
+ getFieldNames: {fields.collect {it.fieldName}},
+ getDataType : {fieldName -> fields.find {it.fieldName == fieldName}.dataType}
+ ] as RecordSchema
+
+ def tableSchema = [
+ [
+ new PutDatabaseRecord.ColumnDescription('id', 4, true, 2),
+ new PutDatabaseRecord.ColumnDescription('name', 12, true, 255),
+ new PutDatabaseRecord.ColumnDescription('code', 4, true, 10)
+ ],
+ false,
+ ['id'] as Set<String>,
+ '"'
+
+ ] as PutDatabaseRecord.TableSchema
+
+ runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false')
+ runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_FIELD)
+ runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, PutDatabaseRecord.IGNORE_UNMATCHED_COLUMN)
+ runner.setProperty(PutDatabaseRecord.QUOTED_IDENTIFIERS, 'true')
+ runner.setProperty(PutDatabaseRecord.QUOTED_TABLE_IDENTIFIER, 'true')
+ def settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext())
+
+ processor.with {
+
+ assertEquals('"test_catalog"."test_schema"."test_table"',
+ generateTableName(settings,"test_catalog","test_schema","test_table",tableSchema))
+
+ }
+ }
}