You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/02/01 17:55:54 UTC
[1/3] nifi git commit: NIFI-1093 added support for handling
non-required columns Mostly based on contribution from Daniel Cave
Repository: nifi
Updated Branches:
refs/heads/master dbe8ff3f4 -> ef80549d6
NIFI-1093 added support for handling non-required columns
Mostly based on contribution from Daniel Cave
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/01268002
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/01268002
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/01268002
Branch: refs/heads/master
Commit: 012680020173a2e3db2df2f7a1cb633bf356c2f0
Parents: 73c0637
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Fri Jan 29 13:41:27 2016 -0500
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Fri Jan 29 13:41:27 2016 -0500
----------------------------------------------------------------------
.../processors/standard/ConvertJSONToSQL.java | 47 ++++-
.../standard/TestConvertJSONToSQL.java | 204 +++++++++++++++++++
2 files changed, 245 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/01268002/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 4125eeb..f8306a2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -100,6 +100,15 @@ public class ConvertJSONToSQL extends AbstractProcessor {
"Any field in the JSON document that cannot be mapped to a column in the database is ignored");
static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail",
"If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
+ static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns",
+ "Ignore Unmatched Columns",
+ "Any column in the database that does not have a field in the JSON document will be assumed to not be required. No notification will be logged");
+ static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warn on Unmatched Columns",
+ "Warning Unmatched Columns",
+ "Any column in the database that does not have a field in the JSON document will be assumed to not be required. A warning will be logged");
+ static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail on Unmatched Columns",
+ "Fail on Unmatched Columns",
+ "A flow will fail if any column in the database that does not have a field in the JSON document. An error will be logged");
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool")
@@ -159,6 +168,12 @@ public class ConvertJSONToSQL extends AbstractProcessor {
.expressionLanguageSupported(true)
.build();
+ static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder()
+ .name("Unmatched Column Behavior")
+ .description("If an incoming JSON element does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation")
+ .allowableValues(IGNORE_UNMATCHED_COLUMN, WARNING_UNMATCHED_COLUMN ,FAIL_UNMATCHED_COLUMN)
+ .defaultValue(FAIL_UNMATCHED_COLUMN.getValue())
+ .build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
@@ -193,6 +208,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
properties.add(SCHEMA_NAME);
properties.add(TRANSLATE_FIELD_NAMES);
properties.add(UNMATCHED_FIELD_BEHAVIOR);
+ properties.add(UNMATCHED_COLUMN_BEHAVIOR);
properties.add(UPDATE_KEY);
return properties;
}
@@ -233,6 +249,10 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
+ // Is the unmatched column behaviour fail or warning?
+ final boolean failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+ final boolean warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+
// get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
// using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
// the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
@@ -309,9 +329,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final String fqTableName = tableNameBuilder.toString();
if (INSERT_TYPE.equals(statementType)) {
- sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields);
+ sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns);
} else {
- sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
+ sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns);
}
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
@@ -359,13 +379,20 @@ public class ConvertJSONToSQL extends AbstractProcessor {
}
private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
- final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
+ final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
+ final boolean warningUnmappedColumns) {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
for (final String requiredColName : schema.getRequiredColumnNames()) {
final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
if (!normalizedFieldNames.contains(normalizedColName)) {
- throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
+ String missingColMessage = "JSON does not have a value for the Required column '" + requiredColName + "'";
+ if (failUnmappedColumns) {
+ getLogger().error(missingColMessage);
+ throw new ProcessException(missingColMessage);
+ } else if (warningUnmappedColumns) {
+ getLogger().warn(missingColMessage);
+ }
}
}
@@ -426,7 +453,8 @@ public class ConvertJSONToSQL extends AbstractProcessor {
}
private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
- final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
+ final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
+ final boolean warningUnmappedColumns) {
final Set<String> updateKeyNames;
if (updateKeys == null) {
@@ -456,7 +484,14 @@ public class ConvertJSONToSQL extends AbstractProcessor {
normalizedUpdateNames.add(normalizedUK);
if (!normalizedFieldNames.contains(normalizedUK)) {
- throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+ String missingColMessage = "JSON does not have a value for the "
+ + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'";
+ if (failUnmappedColumns) {
+ getLogger().error(missingColMessage);
+ throw new ProcessException(missingColMessage);
+ } else if (warningUnmappedColumns) {
+ getLogger().warn(missingColMessage);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/01268002/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index 7422dbc..f4d5b96 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -400,6 +400,210 @@ public class TestConvertJSONToSQL {
runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
}
+ @Test
+ public void testInsertWithMissingColumnFail()
+ throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(
+ "CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
+ }
+ }
+
+ runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail on Unmatched Columns");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+ } // End testInsertWithMissingColumnFail()
+
+ @Test
+ public void testInsertWithMissingColumnWarning()
+ throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(
+ "CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
+ }
+ }
+
+ runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warn on Unmatched Columns");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+ out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.1.value", "1");
+ out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+ out.assertAttributeEquals("sql.args.2.value", "Mark");
+ out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.3.value", "48");
+
+ out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
+ } // End testInsertWithMissingColumnWarning()
+
+ @Test
+ public void testInsertWithMissingColumnIgnore()
+ throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(
+ "CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
+ }
+ }
+
+ runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore Unmatched Columns");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+ out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.1.value", "1");
+ out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+ out.assertAttributeEquals("sql.args.2.value", "Mark");
+ out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.3.value", "48");
+
+ out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
+ } // End testInsertWithMissingColumnIgnore()
+
+ @Test
+ public void testUpdateWithMissingColumnFail()
+ throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+ runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
+ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail on Unmatched Columns");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+ } // End testUpdateWithMissingColumnFail()
+
+ @Test
+ public void testUpdateWithMissingColumnWarning()
+ throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+ runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
+ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warn on Unmatched Columns");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+ out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.1.value", "1");
+ out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+ out.assertAttributeEquals("sql.args.2.value", "Mark");
+ out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.3.value", "48");
+
+ out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
+
+ } // End testUpdateWithMissingColumnWarning()
+
+ @Test
+ public void testUpdateWithMissingColumnIgnore()
+ throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+ runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
+ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore Unmatched Columns");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+ out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.1.value", "1");
+ out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+ out.assertAttributeEquals("sql.args.2.value", "Mark");
+ out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.3.value", "48");
+
+ out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
+
+ } // End testUpdateWithMissingColumnIgnore()
+
/**
* Simple implementation only for testing purposes
[2/3] nifi git commit: NIFI-1093 - Added settings for processing JSON
documents when validating against database columns to fail, log a warning,
or ignore unmatched columns. Added tests for new settings.
Posted by ma...@apache.org.
NIFI-1093 - Added settings for processing JSON documents when validating against database columns to fail, log a warning, or ignore unmatched columns. Added tests for new settings.
Signed-off-by: Mark Payne <ma...@hotmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/171b9c4e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/171b9c4e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/171b9c4e
Branch: refs/heads/master
Commit: 171b9c4e94113dc6e3c42154eb500e046620249e
Parents: dbe8ff3
Author: Daniel Cave <dc...@ssglimited.com>
Authored: Thu Jan 28 18:44:56 2016 +0000
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 1 11:45:15 2016 -0500
----------------------------------------------------------------------
.../processors/standard/ConvertJSONToSQL.java | 100 ++++++----
.../standard/TestConvertJSONToSQL.java | 195 +++++++++++++++++++
2 files changed, 258 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/171b9c4e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
old mode 100644
new mode 100755
index 4125eeb..e7226d1
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -74,32 +74,38 @@ import org.codehaus.jackson.node.JsonNodeFactory;
+ "output as a separate FlowFile to the 'sql' relationship. Upon successful conversion, the original FlowFile is routed to the 'original' "
+ "relationship and the SQL is routed to the 'sql' relationship.")
@WritesAttributes({
- @WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."),
- @WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."),
- @WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. "
- + "If no catalog is used, this attribute will not be added."),
- @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming "
- + "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
- @WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the "
- + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming FlowFile."),
- @WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be "
- + "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL "
- + "FlowFiles were produced"),
- @WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The types of the Parameters "
- + "to use are stored in attributes named sql.args.1.type, sql.args.2.type, sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. "
- + "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values."),
- @WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The values of the Parameters "
- + "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding "
- + "sql.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database.")
+ @WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."),
+ @WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."),
+ @WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. "
+ + "If no catalog is used, this attribute will not be added."),
+ @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming "
+ + "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
+ @WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the "
+ + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming FlowFile."),
+ @WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be "
+ + "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL "
+ + "FlowFiles were produced"),
+ @WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The types of the Parameters "
+ + "to use are stored in attributes named sql.args.1.type, sql.args.2.type, sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. "
+ + "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values."),
+ @WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The values of the Parameters "
+ + "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding "
+ + "sql.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database.")
})
public class ConvertJSONToSQL extends AbstractProcessor {
private static final String UPDATE_TYPE = "UPDATE";
private static final String INSERT_TYPE = "INSERT";
static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
- "Any field in the JSON document that cannot be mapped to a column in the database is ignored");
+ "Any field in the JSON document that cannot be mapped to a column in the database is ignored");
static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail",
- "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
+ "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
+ static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns", "Ignore Unmatched Columns",
+ "Any column in the database that does not have a field in the JSON document will be assumed to not be required. No notification will be logged");
+ static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warning Unmatched Columns", "Warning Unmatched Columns",
+ "Any column in the database that does not have a field in the JSON document will be assumed to not be required. A warning will be logged");
+ static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail Unmatched Columns", "Fail Unmatched Columns",
+ "Any column in the database that does not have a field in the JSON document will fail the flow. An error will be logged");
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool")
@@ -143,16 +149,22 @@ public class ConvertJSONToSQL extends AbstractProcessor {
.defaultValue("true")
.build();
static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new PropertyDescriptor.Builder()
- .name("Unmatched Field Behavior")
- .description("If an incoming JSON element has a field that does not map to any of the database table's columns, this property specifies how to handle the situation")
- .allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD)
- .defaultValue(IGNORE_UNMATCHED_FIELD.getValue())
- .build();
+ .name("Unmatched Field Behavior")
+ .description("If an incoming JSON element has a field that does not map to any of the database table's columns, this property specifies how to handle the situation")
+ .allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD)
+ .defaultValue(IGNORE_UNMATCHED_FIELD.getValue())
+ .build();
+ static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder()
+ .name("Unmatched Column Behavior")
+ .description("If an incoming JSON element does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation")
+ .allowableValues(IGNORE_UNMATCHED_COLUMN, WARNING_UNMATCHED_COLUMN ,FAIL_UNMATCHED_COLUMN)
+ .defaultValue(FAIL_UNMATCHED_COLUMN.getValue())
+ .build();
static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder()
.name("Update Keys")
.description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
+ "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. "
- + "In this case, if no Primary Key exists, the conversion to SQL will fail. "
+ + "In this case, if no Primary Key exists, the conversion to SQL will fail if Unmatched Column Behaviour is set to FAIL. "
+ "This property is ignored if the Statement Type is INSERT")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
@@ -193,6 +205,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
properties.add(SCHEMA_NAME);
properties.add(TRANSLATE_FIELD_NAMES);
properties.add(UNMATCHED_FIELD_BEHAVIOR);
+ properties.add(UNMATCHED_COLUMN_BEHAVIOR);
properties.add(UPDATE_KEY);
return properties;
}
@@ -233,6 +246,10 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
+ // Is the unmatched column behaviour fail or warning?
+ final boolean failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+ final boolean warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+
// get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
// using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
// the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
@@ -309,9 +326,9 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final String fqTableName = tableNameBuilder.toString();
if (INSERT_TYPE.equals(statementType)) {
- sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields);
+ sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns);
} else {
- sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
+ sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields, failUnmappedColumns, warningUnmappedColumns);
}
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
@@ -359,13 +376,18 @@ public class ConvertJSONToSQL extends AbstractProcessor {
}
private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
- final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
-
+ final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields,
+ final boolean failUnmappedColumns, final boolean warningUnmappedColumns) {
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
for (final String requiredColName : schema.getRequiredColumnNames()) {
final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
if (!normalizedFieldNames.contains(normalizedColName)) {
- throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
+ if(failUnmappedColumns) {
+ getLogger().error("JSON does not have a value for the Required column '" + requiredColName + "'");
+ throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
+ } else if(warningUnmappedColumns) {
+ getLogger().warn("JSON does not have a value for the Required column '" + requiredColName + "'");
+ }
}
}
@@ -426,7 +448,8 @@ public class ConvertJSONToSQL extends AbstractProcessor {
}
private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
- final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
+ final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields,
+ final boolean failUnmappedColumns, final boolean warningUnmappedColumns) {
final Set<String> updateKeyNames;
if (updateKeys == null) {
@@ -454,9 +477,13 @@ public class ConvertJSONToSQL extends AbstractProcessor {
for (final String uk : updateKeyNames) {
final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
normalizedUpdateNames.add(normalizedUK);
-
- if (!normalizedFieldNames.contains(normalizedUK)) {
- throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+ if(!normalizedFieldNames.contains(normalizedUK)) {
+ if(failUnmappedColumns) {
+ getLogger().error("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+ throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+ } else if(warningUnmappedColumns) {
+ getLogger().warn("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+ }
}
}
@@ -469,7 +496,6 @@ public class ConvertJSONToSQL extends AbstractProcessor {
final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
final ColumnDescription desc = schema.getColumns().get(normalizedColName);
-
if (desc == null) {
if (!ignoreUnmappedFields) {
throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database");
@@ -554,7 +580,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
private Map<String, ColumnDescription> columns;
private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
- final Set<String> primaryKeyColumnNames) {
+ final Set<String> primaryKeyColumnNames) {
this.columns = new HashMap<>();
this.primaryKeyColumnNames = primaryKeyColumnNames;
@@ -580,7 +606,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
}
public static TableSchema from(final Connection conn, final String catalog, final String schema, final String tableName,
- final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
+ final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, schema, tableName, "%")) {
final List<ColumnDescription> cols = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/nifi/blob/171b9c4e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
old mode 100644
new mode 100755
index 7422dbc..dd89e96
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -400,6 +400,201 @@ public class TestConvertJSONToSQL {
runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
}
+ @Test
+ public void testInsertWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
+ }
+ }
+
+ runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+ } // End testInsertWithMissingColumnFail()
+
+ @Test
+ public void testInsertWithMissingColumnWarning() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
+ }
+ }
+
+ runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+ out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.1.value", "1");
+ out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+ out.assertAttributeEquals("sql.args.2.value", "Mark");
+ out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.3.value", "48");
+
+ out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
+ } // End testInsertWithMissingColumnWarning()
+
+ @Test
+ public void testInsertWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
+ }
+ }
+
+ runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
+ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore Unmatched Columns");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+ out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.1.value", "1");
+ out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+ out.assertAttributeEquals("sql.args.2.value", "Mark");
+ out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.3.value", "48");
+
+ out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)");
+ } // End testInsertWithMissingColumnIgnore()
+
+ @Test
+ public void testUpdateWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+ runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
+ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
+ } // End testUpdateWithMissingColumnFail()
+
+ @Test
+ public void testUpdateWithMissingColumnWarning() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+ runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
+ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+ out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.1.value", "1");
+ out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+ out.assertAttributeEquals("sql.args.2.value", "Mark");
+ out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.3.value", "48");
+
+ out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
+
+ } // End testUpdateWithMissingColumnWarning()
+
+ @Test
+ public void testUpdateWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersons);
+ }
+ }
+
+ runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+ runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
+ runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
+ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Ignore Unmatched Columns");
+ runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+ runner.run();
+
+ runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+ runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+ out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.1.value", "1");
+ out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+ out.assertAttributeEquals("sql.args.2.value", "Mark");
+ out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+ out.assertAttributeEquals("sql.args.3.value", "48");
+
+ out.assertContentEquals("UPDATE PERSONS SET ID = ? WHERE NAME = ? AND CODE = ?");
+
+ } // End testUpdateWithMissingColumnIgnore()
+
/**
* Simple implementation only for testing purposes
[3/3] nifi git commit: Merge branch 'NIFI-1093' of
https://github.com/olegz/nifi into NIFI-1093
Posted by ma...@apache.org.
Merge branch 'NIFI-1093' of https://github.com/olegz/nifi into NIFI-1093
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ef80549d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ef80549d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ef80549d
Branch: refs/heads/master
Commit: ef80549d63a5dc347731701d32a80c2446c46523
Parents: 171b9c4 0126800
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 1 11:51:27 2016 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 1 11:51:27 2016 -0500
----------------------------------------------------------------------
.../processors/standard/ConvertJSONToSQL.java | 47 +++++++++++---------
.../standard/TestConvertJSONToSQL.java | 8 ++--
2 files changed, 31 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/ef80549d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index e7226d1,f8306a2..3eb44cb
mode 100755,100644..100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@@ -97,15 -97,18 +97,18 @@@ public class ConvertJSONToSQL extends A
private static final String INSERT_TYPE = "INSERT";
static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
- "Any field in the JSON document that cannot be mapped to a column in the database is ignored");
+ "Any field in the JSON document that cannot be mapped to a column in the database is ignored");
static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail",
- "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
- static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns", "Ignore Unmatched Columns",
+ "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
+ static final AllowableValue IGNORE_UNMATCHED_COLUMN = new AllowableValue("Ignore Unmatched Columns",
+ "Ignore Unmatched Columns",
"Any column in the database that does not have a field in the JSON document will be assumed to not be required. No notification will be logged");
- static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warning Unmatched Columns", "Warning Unmatched Columns",
+ static final AllowableValue WARNING_UNMATCHED_COLUMN = new AllowableValue("Warn on Unmatched Columns",
- "Warning Unmatched Columns",
++ "Warn on Unmatched Columns",
"Any column in the database that does not have a field in the JSON document will be assumed to not be required. A warning will be logged");
- static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail Unmatched Columns", "Fail Unmatched Columns",
- "Any column in the database that does not have a field in the JSON document will fail the flow. An error will be logged");
+ static final AllowableValue FAIL_UNMATCHED_COLUMN = new AllowableValue("Fail on Unmatched Columns",
+ "Fail on Unmatched Columns",
+ "A flow will fail if any column in the database that does not have a field in the JSON document. An error will be logged");
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool")
@@@ -477,12 -482,15 +482,14 @@@
for (final String uk : updateKeyNames) {
final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
normalizedUpdateNames.add(normalizedUK);
- if(!normalizedFieldNames.contains(normalizedUK)) {
- if(failUnmappedColumns) {
- getLogger().error("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
- throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
- } else if(warningUnmappedColumns) {
- getLogger().warn("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+
+ if (!normalizedFieldNames.contains(normalizedUK)) {
- String missingColMessage = "JSON does not have a value for the "
- + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'";
++ String missingColMessage = "JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'";
+ if (failUnmappedColumns) {
+ getLogger().error(missingColMessage);
+ throw new ProcessException(missingColMessage);
+ } else if (warningUnmappedColumns) {
+ getLogger().warn(missingColMessage);
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ef80549d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index dd89e96,f4d5b96..60d993e
mode 100755,100644..100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@@ -418,7 -420,7 +418,7 @@@ public class TestConvertJSONToSQL
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
- runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns");
- runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail on Unmatched Columns");
++ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_COLUMN);
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
@@@ -443,7 -447,7 +443,7 @@@
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
- runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns");
- runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warn on Unmatched Columns");
++ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.WARNING_UNMATCHED_COLUMN);
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
@@@ -514,7 -521,7 +514,7 @@@
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
- runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail Unmatched Columns");
- runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Fail on Unmatched Columns");
++ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_COLUMN);
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
@@@ -540,7 -548,7 +540,7 @@@
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
runner.setProperty(ConvertJSONToSQL.UPDATE_KEY, "name, code, extra");
- runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warning Unmatched Columns");
- runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, "Warn on Unmatched Columns");
++ runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.WARNING_UNMATCHED_COLUMN);
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();