You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/02/01 17:55:54 UTC

[1/3] nifi git commit: NIFI-1093 added support for handling non-required columns Mostly based on contribution from Daniel Cave

Repository: nifi
Updated Branches:
  refs/heads/master dbe8ff3f4 -> ef80549d6


NIFI-1093 added support for handling non-required columns
Mostly based on contribution from Daniel Cave


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/01268002
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/01268002
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/01268002

Branch: refs/heads/master
Commit: 012680020173a2e3db2df2f7a1cb633bf356c2f0
Parents: 73c0637
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Fri Jan 29 13:41:27 2016 -0500
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Fri Jan 29 13:41:27 2016 -0500

----------------------------------------------------------------------
 .../processors/standard/ConvertJSONToSQL.java   |  47 ++++-
 .../standard/TestConvertJSONToSQL.java          | 204 +++++++++++++++++++
 2 files changed, 245 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/01268002/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 4125eeb..f8306a2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -100,6 +100,15 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         "Any field in the JSON document that cannot be mapped to a column in the database is ignored");
     static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail",
         "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
+    static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns",
+            "Ignore Unmatched Columns",
+            "Any column in the database that does not have a field in the JSON document will be assumed to not be required.  No notification will be logged");
+    static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warn on Unmatched Columns",
+            "Warning Unmatched Columns",
+            "Any column in the database that does not have a field in the JSON document will be assumed to not be required.  A warning will be logged");
+    static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail on Unmatched Columns",
+            "Fail on Unmatched Columns",
+            "A flow will fail if any column in the database that does not have a field in the JSON document.  An error will be logged");
 
     static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
             .name("JDBC Connection Pool")
@@ -159,6 +168,12 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             .expressionLanguageSupported(true)
             .build();
 
+    static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder()
+            .name("Unmatched Column Behavior")
+            .description("If an incoming JSON element does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation")
+            .allowableValues(IGNORE_UNMATCHED_COLUMN, WARNING_UNMATCHED_COLUMN ,FAIL_UNMATCHED_COLUMN)
+            .defaultValue(FAIL_UNMATCHED_COLUMN.getValue())
+            .build();
 
     static final Relationship REL_ORIGINAL = new Relationship.Builder()
             .name("original")
@@ -193,6 +208,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         properties.add(SCHEMA_NAME);
         properties.add(TRANSLATE_FIELD_NAMES);
         properties.add(UNMATCHED_FIELD_BEHAVIOR);
+        properties.add(UNMATCHED_COLUMN_BEHAVIOR);
         properties.add(UPDATE_KEY);
         return properties;
     }
@@ -233,6 +249,10 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
         final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
 
+        // Is the unmatched column behaviour fail or warning?
+        final boolean failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+        final boolean warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+
         // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
         // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
         // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
@@ -309,9 +329,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                 final String fqTableName = tableNameBuilder.toString();
 
                 if (INSERT_TYPE.equals(statementType)) {
-                    sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields);
+                    sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns);
                 } else {
-                    sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
+                    sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns);
                 }
             } catch (final ProcessException pe) {
                 getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
@@ -359,13 +379,20 @@ public class ConvertJSONToSQL extends AbstractProcessor {
     }
 
     private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
-        final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
+        final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
+        final boolean warningUnmappedColumns) {
 
         final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
         for (final String requiredColName : schema.getRequiredColumnNames()) {
             final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
             if (!normalizedFieldNames.contains(normalizedColName)) {
-                throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
+                String missingColMessage = "JSON does not have a value for the Required column '" + requiredColName + "'";
+                if (failUnmappedColumns) {
+                    getLogger().error(missingColMessage);
+                    throw new ProcessException(missingColMessage);
+                } else if (warningUnmappedColumns) {
+                    getLogger().warn(missingColMessage);
+                }
             }
         }
 
@@ -426,7 +453,8 @@ public class ConvertJSONToSQL extends AbstractProcessor {
     }
 
     private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
-        final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
+        final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
+        final boolean warningUnmappedColumns) {
 
         final Set<String> updateKeyNames;
         if (updateKeys == null) {
@@ -456,7 +484,14 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             normalizedUpdateNames.add(normalizedUK);
 
             if (!normalizedFieldNames.contains(normalizedUK)) {
-                throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+                String missingColMessage = "JSON does not have a value for the "
+                        + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'";
+                if (failUnmappedColumns) {
+                    getLogger().error(missingColMessage);
+                    throw new ProcessException(missingColMessage);
+                } else if (warningUnmappedColumns) {
+                    getLogger().warn(missingColMessage);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/01268002/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index 7422dbc..f4d5b96 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -400,6 +400,210 @@ public class TestConvertJSONToSQL {
         runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
     }
 
+    @Test
+    public void testInsertWithMissingColumnFail()
+            throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(
+                        "CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail on Unmatched Columns");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    } // End testInsertWithMissingColumnFail()
+
+    @Test
+    public void testInsertWithMissingColumnWarning()
+            throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(
+                        "CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warn on Unmatched Columns");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
+    } // End testInsertWithMissingColumnWarning()
+
+    @Test
+    public void testInsertWithMissingColumnIgnore()
+            throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(
+                        "CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore Unmatched Columns");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
+    } // End testInsertWithMissingColumnIgnore()
+
+    @Test
+    public void testUpdateWithMissingColumnFail()
+            throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code, extra");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail on Unmatched Columns");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    } // End testUpdateWithMissingColumnFail()
+
+    @Test
+    public void testUpdateWithMissingColumnWarning()
+            throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code, extra");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warn on Unmatched Columns");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
+
+    } // End testUpdateWithMissingColumnWarning()
+
+    @Test
+    public void testUpdateWithMissingColumnIgnore()
+            throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code, extra");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore Unmatched Columns");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
+
+    } // End testUpdateWithMissingColumnIgnore()
+
 
     /**
      * Simple implementation only for testing purposes


[2/3] nifi git commit: NIFI-1093 - Added settings for processing JSON documents when validating against database columns to fail, log a warning, or ignore unmatched columns. Added tests for new settings.

Posted by ma...@apache.org.
NIFI-1093 - Added settings for processing JSON documents when validating against database columns to fail, log a warning, or ignore unmatched columns. Added tests for new settings.

Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/171b9c4e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/171b9c4e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/171b9c4e

Branch: refs/heads/master
Commit: 171b9c4e94113dc6e3c42154eb500e046620249e
Parents: dbe8ff3
Author: Daniel Cave <dc...@ssglimited.com>
Authored: Thu Jan 28 18:44:56 2016 +0000
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 1 11:45:15 2016 -0500

----------------------------------------------------------------------
 .../processors/standard/ConvertJSONToSQL.java   | 100 ++++++----
 .../standard/TestConvertJSONToSQL.java          | 195 +++++++++++++++++++
 2 files changed, 258 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/171b9c4e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
old mode 100644
new mode 100755
index 4125eeb..e7226d1
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -74,32 +74,38 @@ import org.codehaus.jackson.node.JsonNodeFactory;
         + "output as a separate FlowFile to the 'sql' relationship. Upon successful conversion, the original FlowFile is routed to the 'original' "
         + "relationship and the SQL is routed to the 'sql' relationship.")
 @WritesAttributes({
-    @WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."),
-    @WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."),
-    @WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. "
-            + "If no catalog is used, this attribute will not be added."),
-    @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming "
-            + "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
-    @WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the "
-            + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming FlowFile."),
-    @WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be "
-            + "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL "
-            + "FlowFiles were produced"),
-    @WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The types of the Parameters "
-            + "to use are stored in attributes named sql.args.1.type, sql.args.2.type, sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. "
-            + "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values."),
-    @WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The values of the Parameters "
-            + "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding "
-            + "sql.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database.")
+        @WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."),
+        @WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."),
+        @WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. "
+                + "If no catalog is used, this attribute will not be added."),
+        @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming "
+                + "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the "
+                + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming FlowFile."),
+        @WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL "
+                + "FlowFiles were produced"),
+        @WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The types of the Parameters "
+                + "to use are stored in attributes named sql.args.1.type, sql.args.2.type, sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. "
+                + "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values."),
+        @WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The values of the Parameters "
+                + "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding "
+                + "sql.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database.")
 })
 public class ConvertJSONToSQL extends AbstractProcessor {
     private static final String UPDATE_TYPE = "UPDATE";
     private static final String INSERT_TYPE = "INSERT";
 
     static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
-        "Any field in the JSON document that cannot be mapped to a column in the database is ignored");
+            "Any field in the JSON document that cannot be mapped to a column in the database is ignored");
     static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail",
-        "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
+            "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
+    static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns", "Ignore Unmatched Columns",
+            "Any column in the database that does not have a field in the JSON document will be assumed to not be required.  No notification will be logged");
+    static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warning Unmatched Columns", "Warning Unmatched Columns",
+            "Any column in the database that does not have a field in the JSON document will be assumed to not be required.  A warning will be logged");
+    static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail Unmatched Columns", "Fail Unmatched Columns",
+            "Any column in the database that does not have a field in the JSON document will fail the flow.  An error will be logged");
 
     static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
             .name("JDBC Connection Pool")
@@ -143,16 +149,22 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             .defaultValue("true")
             .build();
     static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new PropertyDescriptor.Builder()
-        .name("Unmatched Field Behavior")
-        .description("If an incoming JSON element has a field that does not map to any of the database table's columns, this property specifies how to handle the situation")
-        .allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD)
-        .defaultValue(IGNORE_UNMATCHED_FIELD.getValue())
-        .build();
+            .name("Unmatched Field Behavior")
+            .description("If an incoming JSON element has a field that does not map to any of the database table's columns, this property specifies how to handle the situation")
+            .allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD)
+            .defaultValue(IGNORE_UNMATCHED_FIELD.getValue())
+            .build();
+    static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder()
+            .name("Unmatched Column Behavior")
+            .description("If an incoming JSON element does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation")
+            .allowableValues(IGNORE_UNMATCHED_COLUMN, WARNING_UNMATCHED_COLUMN ,FAIL_UNMATCHED_COLUMN)
+            .defaultValue(FAIL_UNMATCHED_COLUMN.getValue())
+            .build();
     static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder()
             .name("Update Keys")
             .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
                     + "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. "
-                    + "In this case, if no Primary Key exists, the conversion to SQL will fail. "
+                    + "In this case, if no Primary Key exists, the conversion to SQL will fail if Unmatched Column Behaviour is set to FAIL. "
                     + "This property is ignored if the Statement Type is INSERT")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(false)
@@ -193,6 +205,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         properties.add(SCHEMA_NAME);
         properties.add(TRANSLATE_FIELD_NAMES);
         properties.add(UNMATCHED_FIELD_BEHAVIOR);
+        properties.add(UNMATCHED_COLUMN_BEHAVIOR);
         properties.add(UPDATE_KEY);
         return properties;
     }
@@ -233,6 +246,10 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
         final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
 
+        // Is the unmatched column behaviour fail or warning?
+        final boolean failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+        final boolean warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+
         // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
         // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
         // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
@@ -309,9 +326,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                 final String fqTableName = tableNameBuilder.toString();
 
                 if (INSERT_TYPE.equals(statementType)) {
-                    sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields);
+                    sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns);
                 } else {
-                    sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
+                    sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns);
                 }
             } catch (final ProcessException pe) {
                 getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
@@ -359,13 +376,18 @@ public class ConvertJSONToSQL extends AbstractProcessor {
     }
 
     private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
-        final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
-
+                                  final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields,
+                                  final boolean failUnmappedColumns, final boolean warningUnmappedColumns) {
         final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
         for (final String requiredColName : schema.getRequiredColumnNames()) {
             final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
             if (!normalizedFieldNames.contains(normalizedColName)) {
-                throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
+              if(failUnmappedColumns) {
+                    getLogger().error("JSON does not have a value for the Required column '" + requiredColName + "'");
+                    throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
+                } else if(warningUnmappedColumns) {
+                    getLogger().warn("JSON does not have a value for the Required column '" + requiredColName + "'");
+                }
             }
         }
 
@@ -426,7 +448,8 @@ public class ConvertJSONToSQL extends AbstractProcessor {
     }
 
     private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
-        final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
+                                  final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields,
+                                  final boolean failUnmappedColumns, final boolean warningUnmappedColumns) {
 
         final Set<String> updateKeyNames;
         if (updateKeys == null) {
@@ -454,9 +477,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         for (final String uk : updateKeyNames) {
             final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
             normalizedUpdateNames.add(normalizedUK);
-
-            if (!normalizedFieldNames.contains(normalizedUK)) {
-                throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+            if(!normalizedFieldNames.contains(normalizedUK)) {
+              if(failUnmappedColumns) {
+                    getLogger().error("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+                    throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+                } else if(warningUnmappedColumns) {
+                    getLogger().warn("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+                }
             }
         }
 
@@ -469,7 +496,6 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
             final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
             final ColumnDescription desc = schema.getColumns().get(normalizedColName);
-
             if (desc == null) {
                 if (!ignoreUnmappedFields) {
                     throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database");
@@ -554,7 +580,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         private Map<String, ColumnDescription> columns;
 
         private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
-                final Set<String> primaryKeyColumnNames) {
+                            final Set<String> primaryKeyColumnNames) {
             this.columns = new HashMap<>();
             this.primaryKeyColumnNames = primaryKeyColumnNames;
 
@@ -580,7 +606,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         }
 
         public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName,
-                final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
+                                       final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
             try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, schema, tableName, "%")) {
 
                 final List<ColumnDescription> cols = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/171b9c4e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
old mode 100644
new mode 100755
index 7422dbc..dd89e96
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -400,6 +400,201 @@ public class TestConvertJSONToSQL {
         runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
     }
 
+    @Test
+    public void testInsertWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    } // End testInsertWithMissingColumnFail()
+
+    @Test
+    public void testInsertWithMissingColumnWarning() throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
+    } // End testInsertWithMissingColumnWarning()
+
+    @Test
+    public void testInsertWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore Unmatched Columns");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
+    } // End testInsertWithMissingColumnIgnore()
+
+    @Test
+    public void testUpdateWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code, extra");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+    } // End testUpdateWithMissingColumnFail()
+
+    @Test
+    public void testUpdateWithMissingColumnWarning() throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code, extra");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
+
+    } // End testUpdateWithMissingColumnWarning()
+
+    @Test
+    public void testUpdateWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+        runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code, extra");
+        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore Unmatched Columns");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
+
+    } // End testUpdateWithMissingColumnIgnore()
+
 
     /**
      * Simple implementation only for testing purposes


[3/3] nifi git commit: Merge branch 'NIFI-1093' of https://github.com/olegz/nifi into NIFI-1093

Posted by ma...@apache.org.
Merge branch 'NIFI-1093' of https://github.com/olegz/nifi into NIFI-1093


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ef80549d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ef80549d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ef80549d

Branch: refs/heads/master
Commit: ef80549d63a5dc347731701d32a80c2446c46523
Parents: 171b9c4 0126800
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 1 11:51:27 2016 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 1 11:51:27 2016 -0500

----------------------------------------------------------------------
 .../processors/standard/ConvertJSONToSQL.java   | 47 +++++++++++---------
 .../standard/TestConvertJSONToSQL.java          |  8 ++--
 2 files changed, 31 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ef80549d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index e7226d1,f8306a2..3eb44cb
mode 100755,100644..100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@@ -97,15 -97,18 +97,18 @@@ public class ConvertJSONToSQL extends A
      private static final String INSERT_TYPE = "INSERT";
  
      static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
 -        "Any field in the JSON document that cannot be mapped to a column in the database is ignored");
 +            "Any field in the JSON document that cannot be mapped to a column in the database is ignored");
      static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail",
-             "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
-     static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns", "Ignore Unmatched Columns",
+         "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
+     static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns",
+             "Ignore Unmatched Columns",
              "Any column in the database that does not have a field in the JSON document will be assumed to not be required.  No notification will be logged");
-     static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warning Unmatched Columns", "Warning Unmatched Columns",
+     static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warn on Unmatched Columns",
 -            "Warning Unmatched Columns",
++            "Warn on Unmatched Columns",
              "Any column in the database that does not have a field in the JSON document will be assumed to not be required.  A warning will be logged");
-     static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail Unmatched Columns", "Fail Unmatched Columns",
-             "Any column in the database that does not have a field in the JSON document will fail the flow.  An error will be logged");
+     static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail on Unmatched Columns",
+             "Fail on Unmatched Columns",
+             "A flow will fail if any column in the database that does not have a field in the JSON document.  An error will be logged");
  
      static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
              .name("JDBC Connection Pool")
@@@ -477,12 -482,15 +482,14 @@@
          for (final String uk : updateKeyNames) {
              final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
              normalizedUpdateNames.add(normalizedUK);
-             if(!normalizedFieldNames.contains(normalizedUK)) {
-               if(failUnmappedColumns) {
-                     getLogger().error("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
-                     throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
-                 } else if(warningUnmappedColumns) {
-                     getLogger().warn("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+ 
+             if (!normalizedFieldNames.contains(normalizedUK)) {
 -                String missingColMessage = "JSON does not have a value for the "
 -                        + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'";
++                String missingColMessage = "JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'";
+                 if (failUnmappedColumns) {
+                     getLogger().error(missingColMessage);
+                     throw new ProcessException(missingColMessage);
+                 } else if (warningUnmappedColumns) {
+                     getLogger().warn(missingColMessage);
                  }
              }
          }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef80549d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index dd89e96,f4d5b96..60d993e
mode 100755,100644..100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@@ -418,7 -420,7 +418,7 @@@ public class TestConvertJSONToSQL 
          runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
          runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
          runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
-         runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns");
 -        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail on Unmatched Columns");
++        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_COLUMN);
          runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
          runner.run();
  
@@@ -443,7 -447,7 +443,7 @@@
          runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
          runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
          runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
-         runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns");
 -        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warn on Unmatched Columns");
++        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.WARNING_UNMATCHED_COLUMN);
          runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
          runner.run();
  
@@@ -514,7 -521,7 +514,7 @@@
          runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
          runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
          runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code, extra");
-         runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns");
 -        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail on Unmatched Columns");
++        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_COLUMN);
          runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
          runner.run();
  
@@@ -540,7 -548,7 +540,7 @@@
          runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
          runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
          runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name,  code, extra");
-         runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns");
 -        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warn on Unmatched Columns");
++        runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.WARNING_UNMATCHED_COLUMN);
          runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
          runner.run();