You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/12/03 15:48:46 UTC
nifi git commit: NIFI-1244: Incorporate Schema Name property
Repository: nifi
Updated Branches:
refs/heads/master ecc240b91 -> 6b75eda9a
NIFI-1244: Incorporate Schema Name property
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6b75eda9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6b75eda9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6b75eda9
Branch: refs/heads/master
Commit: 6b75eda9ab6fd509bd813e149f99a02bf2466943
Parents: ecc240b
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Dec 2 16:36:38 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 3 09:48:27 2015 -0500
----------------------------------------------------------------------
.../processors/standard/ConvertJSONToSQL.java | 30 ++++++++++++++++----
1 file changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b75eda9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index c46d3e2..902af51 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -128,6 +128,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+ .name("Schema Name")
+ .description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder()
.name("Translate Field Names")
.description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. "
@@ -183,6 +190,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
properties.add(STATEMENT_TYPE);
properties.add(TABLE_NAME);
properties.add(CATALOG_NAME);
+ properties.add(SCHEMA_NAME);
properties.add(TRANSLATE_FIELD_NAMES);
properties.add(UNMATCHED_FIELD_BEHAVIOR);
properties.add(UPDATE_KEY);
@@ -220,6 +228,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
@@ -235,7 +244,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
// No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
try (final Connection conn = dbcpService.getConnection()) {
- schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys);
+ schema = TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys);
schemaCache.put(schemaKey, schema);
} catch (final SQLException e) {
getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e);
@@ -288,10 +297,21 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final Map<String, String> attributes = new HashMap<>();
try {
+ // build the fully qualified table name
+ final StringBuilder tableNameBuilder = new StringBuilder();
+ if (catalog != null) {
+ tableNameBuilder.append(catalog).append(".");
+ }
+ if (schemaName != null) {
+ tableNameBuilder.append(schemaName).append(".");
+ }
+ tableNameBuilder.append(tableName);
+ final String fqTableName = tableNameBuilder.toString();
+
if (INSERT_TYPE.equals(statementType)) {
- sql = generateInsert(jsonNode, attributes, tableName, schema, translateFieldNames, ignoreUnmappedFields);
+ sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields);
} else {
- sql = generateUpdate(jsonNode, attributes, tableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
+ sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
}
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
@@ -559,9 +579,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
return primaryKeyColumnNames;
}
- public static TableSchema from(final Connection conn, final String catalog, final String tableName,
+ public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName,
final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
- try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%")) {
+ try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, schema, tableName, "%")) {
final List<ColumnDescription> cols = new ArrayList<>();
while (colrs.next()) {