You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/03/21 21:20:41 UTC

nifi git commit: NIFI-3626: Add support for DELETE in ConvertJSONToSQL

Repository: nifi
Updated Branches:
  refs/heads/master 40acd4a6e -> c4d0c0bbd


NIFI-3626: Add support for DELETE in ConvertJSONToSQL

This closes #1605.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c4d0c0bb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c4d0c0bb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c4d0c0bb

Branch: refs/heads/master
Commit: c4d0c0bbd18759100641d0cbca0b584f8f20a813
Parents: 40acd4a
Author: Matt Burgess <ma...@apache.org>
Authored: Mon Mar 20 15:05:46 2017 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Mar 21 17:20:27 2017 -0400

----------------------------------------------------------------------
 .../processors/standard/ConvertJSONToSQL.java   |  90 +++++++++++-
 .../standard/TestConvertJSONToSQL.java          | 144 +++++++++++++++++++
 2 files changed, 230 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c4d0c0bb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 4d8f462..2eb9cad 100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -74,8 +74,8 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu
 @SupportsBatching
 @SeeAlso(PutSQL.class)
 @InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"})
-@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be "
+@Tags({"json", "sql", "database", "rdbms", "insert", "update", "delete", "relational", "flat"})
+@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE, INSERT, or DELETE SQL statement. The incoming FlowFile is expected to be "
         + "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to "
         + "a JSON object, that JSON object will be interpreted as Text. If the input is an array of JSON elements, each element in the array is "
         + "output as a separate FlowFile to the 'sql' relationship. Upon successful conversion, the original FlowFile is routed to the 'original' "
@@ -102,6 +102,7 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttribu
 public class ConvertJSONToSQL extends AbstractProcessor {
     private static final String UPDATE_TYPE = "UPDATE";
     private static final String INSERT_TYPE = "INSERT";
+    private static final String DELETE_TYPE = "DELETE";
 
     static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
             "Any field in the JSON document that cannot be mapped to a column in the database is ignored");
@@ -128,7 +129,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             .name("Statement Type")
             .description("Specifies the type of SQL Statement to generate")
             .required(true)
-            .allowableValues(UPDATE_TYPE, INSERT_TYPE)
+            .allowableValues(UPDATE_TYPE, INSERT_TYPE, DELETE_TYPE)
             .build();
     static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
             .name("Table Name")
@@ -363,9 +364,12 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                 if (INSERT_TYPE.equals(statementType)) {
                     sql = generateInsert(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
                             failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
-                } else {
+                } else if (UPDATE_TYPE.equals(statementType)) {
                     sql = generateUpdate(jsonNode, attributes, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields,
                             failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
+                } else {
+                    sql = generateDelete(jsonNode, attributes, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
+                            failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
                 }
             } catch (final ProcessException pe) {
                 getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
@@ -650,6 +654,84 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         return sqlBuilder.toString();
     }
 
+    private String generateDelete(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
+                                  final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
+                                  final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
+        final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
+        for (final String requiredColName : schema.getRequiredColumnNames()) {
+            final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
+            if (!normalizedFieldNames.contains(normalizedColName)) {
+                String missingColMessage = "JSON does not have a value for the Required column '" + requiredColName + "'";
+                if (failUnmappedColumns) {
+                    getLogger().error(missingColMessage);
+                    throw new ProcessException(missingColMessage);
+                } else if (warningUnmappedColumns) {
+                    getLogger().warn(missingColMessage);
+                }
+            }
+        }
+
+        final StringBuilder sqlBuilder = new StringBuilder();
+        int fieldCount = 0;
+        sqlBuilder.append("DELETE FROM ");
+        if (quoteTableName) {
+            sqlBuilder.append(schema.getQuotedIdentifierString())
+                    .append(tableName)
+                    .append(schema.getQuotedIdentifierString());
+        } else {
+            sqlBuilder.append(tableName);
+        }
+
+        sqlBuilder.append(" WHERE ");
+
+        // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
+        // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
+        // columns that we are inserting into
+        final Iterator<String> fieldNames = rootNode.getFieldNames();
+        while (fieldNames.hasNext()) {
+            final String fieldName = fieldNames.next();
+
+            final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
+            if (desc == null && !ignoreUnmappedFields) {
+                throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database");
+            }
+
+            if (desc != null) {
+                if (fieldCount++ > 0) {
+                    sqlBuilder.append(" AND ");
+                }
+
+                if (escapeColumnNames) {
+                    sqlBuilder.append(schema.getQuotedIdentifierString())
+                            .append(desc.getColumnName())
+                            .append(schema.getQuotedIdentifierString());
+                } else {
+                    sqlBuilder.append(desc.getColumnName());
+                }
+                sqlBuilder.append(" = ?");
+
+                final int sqlType = desc.getDataType();
+                attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
+
+                final Integer colSize = desc.getColumnSize();
+                final JsonNode fieldNode = rootNode.get(fieldName);
+                if (!fieldNode.isNull()) {
+                    String fieldValue = fieldNode.asText();
+                    if (colSize != null && fieldValue.length() > colSize) {
+                        fieldValue = fieldValue.substring(0, colSize);
+                    }
+                    attributes.put("sql.args." + fieldCount + ".value", fieldValue);
+                }
+            }
+        }
+
+        if (fieldCount == 0) {
+            throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table");
+        }
+
+        return sqlBuilder.toString();
+    }
+
     private static String normalizeColumnName(final String colName, final boolean translateColumnNames) {
         return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c4d0c0bb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index cf1459c..bc9d7f9 100755
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -833,6 +833,150 @@ public class TestConvertJSONToSQL {
     } // End testUpdateWithMissingColumnIgnore()
 
 
+    @Test
+    public void testDelete() throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "DELETE");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("DELETE FROM PERSONS WHERE ID = ? AND NAME = ? AND CODE = ?");
+    }
+
+    @Test
+    public void testDeleteQuotedIdentifiers() throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "DELETE");
+        runner.setProperty(ConvertJSONToSQL.QUOTED_IDENTIFIERS, "true");
+
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("DELETE FROM PERSONS WHERE \"ID\" = ? AND \"NAME\" = ? AND \"CODE\" = ?");
+    }
+
+    @Test
+    public void testDeleteQuotedTableIdentifier() throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "DELETE");
+        runner.setProperty(ConvertJSONToSQL.QUOTED_TABLE_IDENTIFIER, "true");
+
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.3.value", "48");
+
+        out.assertContentEquals("DELETE FROM \"PERSONS\" WHERE ID = ? AND NAME = ? AND CODE = ?");
+    }
+
+    @Test
+    public void testDeleteWithNullValue() throws InitializationException, ProcessException, SQLException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
+        final File tempDir = folder.getRoot();
+        final File dbDir = new File(tempDir, "db");
+        final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createPersons);
+            }
+        }
+
+        runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "DELETE");
+        runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-with-null-code.json"));
+        runner.run();
+
+        runner.assertTransferCount(ConvertJSONToSQL.REL_ORIGINAL, 1);
+        runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "1");
+        runner.assertTransferCount(ConvertJSONToSQL.REL_SQL, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertJSONToSQL.REL_SQL).get(0);
+        out.assertAttributeEquals("sql.args.1.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeEquals("sql.args.1.value", "1");
+        out.assertAttributeEquals("sql.args.2.type", String.valueOf(java.sql.Types.VARCHAR));
+        out.assertAttributeEquals("sql.args.2.value", "Mark");
+        out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER));
+        out.assertAttributeNotExists("sql.args.3.value");
+
+        out.assertContentEquals("DELETE FROM PERSONS WHERE ID = ? AND NAME = ? AND CODE = ?");
+    }
+
     /**
      * Simple implementation only for testing purposes
      */