You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/12/03 15:48:46 UTC

nifi git commit: NIFI-1244: Incorporate Schema Name property

Repository: nifi
Updated Branches:
  refs/heads/master ecc240b91 -> 6b75eda9a


NIFI-1244: Incorporate Schema Name property


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6b75eda9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6b75eda9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6b75eda9

Branch: refs/heads/master
Commit: 6b75eda9ab6fd509bd813e149f99a02bf2466943
Parents: ecc240b
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Dec 2 16:36:38 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 3 09:48:27 2015 -0500

----------------------------------------------------------------------
 .../processors/standard/ConvertJSONToSQL.java   | 30 ++++++++++++++++----
 1 file changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6b75eda9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index c46d3e2..902af51 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -128,6 +128,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
+    static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+            .name("Schema Name")
+            .description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
     static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder()
             .name("Translate Field Names")
             .description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. "
@@ -183,6 +190,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         properties.add(STATEMENT_TYPE);
         properties.add(TABLE_NAME);
         properties.add(CATALOG_NAME);
+        properties.add(SCHEMA_NAME);
         properties.add(TRANSLATE_FIELD_NAMES);
         properties.add(UNMATCHED_FIELD_BEHAVIOR);
         properties.add(UPDATE_KEY);
@@ -220,6 +228,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
         final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
         final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
         final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
         final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
@@ -235,7 +244,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                 // No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
                 final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
                 try (final Connection conn = dbcpService.getConnection()) {
-                    schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys);
+                    schema = TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys);
                     schemaCache.put(schemaKey, schema);
                 } catch (final SQLException e) {
                     getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e);
@@ -288,10 +297,21 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             final Map<String, String> attributes = new HashMap<>();
 
             try {
+                // build the fully qualified table name
+                final StringBuilder tableNameBuilder = new StringBuilder();
+                if (catalog != null) {
+                    tableNameBuilder.append(catalog).append(".");
+                }
+                if (schemaName != null) {
+                    tableNameBuilder.append(schemaName).append(".");
+                }
+                tableNameBuilder.append(tableName);
+                final String fqTableName = tableNameBuilder.toString();
+
                 if (INSERT_TYPE.equals(statementType)) {
-                    sql = generateInsert(jsonNode, attributes, tableName, schema, translateFieldNames, ignoreUnmappedFields);
+                    sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields);
                 } else {
-                    sql = generateUpdate(jsonNode, attributes, tableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
+                    sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
                 }
             } catch (final ProcessException pe) {
                 getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
@@ -559,9 +579,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             return primaryKeyColumnNames;
         }
 
-        public static TableSchema from(final Connection conn, final String catalog, final String tableName,
+        public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName,
                 final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
-            try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%")) {
+            try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, schema, tableName, "%")) {
 
                 final List<ColumnDescription> cols = new ArrayList<>();
                 while (colrs.next()) {