You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/19 07:20:50 UTC
[07/24] nifi git commit: NIFI-1054: Fixing Line endings of source code
http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 6f14800..432f22d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -1,684 +1,684 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.util.ObjectHolder;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.node.ArrayNode;
-import org.codehaus.jackson.node.JsonNodeFactory;
-
-@SideEffectFree
-@SupportsBatching
-@SeeAlso(PutSQL.class)
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"})
-@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be "
- + "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to "
- + "a JSON object, that JSON object will be interpreted as Text. If the input is an array of JSON elements, each element in the array is "
- + "output as a separate FlowFile to the 'sql' relationship. Upon successful conversion, the original FlowFile is routed to the 'original' "
- + "relationship and the SQL is routed to the 'sql' relationship.")
-@WritesAttributes({
- @WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."),
- @WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."),
- @WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. "
- + "If no catalog is used, this attribute will not be added."),
- @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming "
- + "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
- @WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the "
- + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming FlowFile."),
- @WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be "
- + "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL "
- + "FlowFiles were produced"),
- @WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The types of the Parameters "
- + "to use are stored in attributes named sql.args.1.type, sql.args.2.type, sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. "
- + "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values."),
- @WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The values of the Parameters "
- + "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding "
- + "sql.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database.")
-})
-public class ConvertJSONToSQL extends AbstractProcessor {
- private static final String UPDATE_TYPE = "UPDATE";
- private static final String INSERT_TYPE = "INSERT";
-
- static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
- "Any field in the JSON document that cannot be mapped to a column in the database is ignored");
- static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail",
- "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
-
- static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
- .name("JDBC Connection Pool")
- .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
- + "The Connection Pool is necessary in order to determine the appropriate database column types.")
- .identifiesControllerService(DBCPService.class)
- .required(true)
- .build();
- static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
- .name("Statement Type")
- .description("Specifies the type of SQL Statement to generate")
- .required(true)
- .allowableValues(UPDATE_TYPE, INSERT_TYPE)
- .build();
- static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
- .name("Table Name")
- .description("The name of the table that the statement should update")
- .required(true)
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder()
- .name("Catalog Name")
- .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty")
- .required(false)
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder()
- .name("Translate Field Names")
- .description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. "
- + "If false, the JSON field names must match the column names exactly, or the column will not be updated")
- .allowableValues("true", "false")
- .defaultValue("true")
- .build();
- static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new PropertyDescriptor.Builder()
- .name("Unmatched Field Behavior")
- .description("If an incoming JSON element has a field that does not map to any of the database table's columns, this property specifies how to handle the situation")
- .allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD)
- .defaultValue(IGNORE_UNMATCHED_FIELD.getValue())
- .build();
- static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder()
- .name("Update Keys")
- .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
- + "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. "
- + "In this case, if no Primary Key exists, the conversion to SQL will fail. "
- + "This property is ignored if the Statement Type is INSERT")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(false)
- .expressionLanguageSupported(true)
- .build();
-
-
- static final Relationship REL_ORIGINAL = new Relationship.Builder()
- .name("original")
- .description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship")
- .build();
- static final Relationship REL_SQL = new Relationship.Builder()
- .name("sql")
- .description("A FlowFile is routed to this relationship when its contents have successfully been converted into a SQL statement")
- .build();
- static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("A FlowFile is routed to this relationship if it cannot be converted into a SQL statement. Common causes include invalid JSON "
- + "content or the JSON content missing a required field (if using an INSERT statement type).")
- .build();
-
- private final Map<SchemaKey, TableSchema> schemaCache = new LinkedHashMap<SchemaKey, TableSchema>(100) {
- private static final long serialVersionUID = 1L;
-
- @Override
- protected boolean removeEldestEntry(Map.Entry<SchemaKey,TableSchema> eldest) {
- return true;
- }
- };
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(CONNECTION_POOL);
- properties.add(STATEMENT_TYPE);
- properties.add(TABLE_NAME);
- properties.add(CATALOG_NAME);
- properties.add(TRANSLATE_FIELD_NAMES);
- properties.add(UNMATCHED_FIELD_BEHAVIOR);
- properties.add(UPDATE_KEY);
- return properties;
- }
-
-
- @Override
- public Set<Relationship> getRelationships() {
- final Set<Relationship> rels = new HashSet<>();
- rels.add(REL_ORIGINAL);
- rels.add(REL_SQL);
- rels.add(REL_FAILURE);
- return rels;
- }
-
-
- @OnScheduled
- public void onScheduled(final ProcessContext context) {
- synchronized (this) {
- schemaCache.clear();
- }
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
- final boolean ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
- final String statementType = context.getProperty(STATEMENT_TYPE).getValue();
- final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
-
- final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
- final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
- final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
- final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
-
- // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
- // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
- // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
- // Java Heap if there are a lot of different SQL statements being generated that reference different tables.
- TableSchema schema;
- synchronized (this) {
- schema = schemaCache.get(schemaKey);
- if (schema == null) {
- // No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
- final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
- try (final Connection conn = dbcpService.getConnection()) {
- schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys);
- schemaCache.put(schemaKey, schema);
- } catch (final SQLException e) {
- getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- }
- }
-
- // Parse the JSON document
- final ObjectMapper mapper = new ObjectMapper();
- final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null);
- try {
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- try (final InputStream bufferedIn = new BufferedInputStream(in)) {
- rootNodeRef.set(mapper.readTree(bufferedIn));
- }
- }
- });
- } catch (final ProcessException pe) {
- getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
-
- final JsonNode rootNode = rootNodeRef.get();
-
- // The node may or may not be a Json Array. If it isn't, we will create an
- // ArrayNode and add just the root node to it. We do this so that we can easily iterate
- // over the array node, rather than duplicating the logic or creating another function that takes many variables
- // in order to implement the logic.
- final ArrayNode arrayNode;
- if (rootNode.isArray()) {
- arrayNode = (ArrayNode) rootNode;
- } else {
- final JsonNodeFactory nodeFactory = JsonNodeFactory.instance;
- arrayNode = new ArrayNode(nodeFactory);
- arrayNode.add(rootNode);
- }
-
- final String fragmentIdentifier = UUID.randomUUID().toString();
-
- final Set<FlowFile> created = new HashSet<>();
- for (int i=0; i < arrayNode.size(); i++) {
- final JsonNode jsonNode = arrayNode.get(i);
-
- final String sql;
- final Map<String, String> attributes = new HashMap<>();
-
- try {
- if (INSERT_TYPE.equals(statementType)) {
- sql = generateInsert(jsonNode, attributes, tableName, schema, translateFieldNames, ignoreUnmappedFields);
- } else {
- sql = generateUpdate(jsonNode, attributes, tableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
- }
- } catch (final ProcessException pe) {
- getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
- new Object[] { flowFile, statementType, pe.toString() }, pe);
- session.remove(created);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
-
- FlowFile sqlFlowFile = session.create(flowFile);
- created.add(sqlFlowFile);
-
- sqlFlowFile = session.write(sqlFlowFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- out.write(sql.getBytes(StandardCharsets.UTF_8));
- }
- });
-
- attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
- attributes.put("sql.table", tableName);
- attributes.put("fragment.identifier", fragmentIdentifier);
- attributes.put("fragment.count", String.valueOf(arrayNode.size()));
- attributes.put("fragment.index", String.valueOf(i));
-
- if (catalog != null) {
- attributes.put("sql.catalog", catalog);
- }
-
- sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes);
- session.transfer(sqlFlowFile, REL_SQL);
- }
-
- session.transfer(flowFile, REL_ORIGINAL);
- }
-
- private Set<String> getNormalizedColumnNames(final JsonNode node, final boolean translateFieldNames) {
- final Set<String> normalizedFieldNames = new HashSet<>();
- final Iterator<String> fieldNameItr = node.getFieldNames();
- while (fieldNameItr.hasNext()) {
- normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), translateFieldNames));
- }
-
- return normalizedFieldNames;
- }
-
- private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
- final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
-
- final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
- for (final String requiredColName : schema.getRequiredColumnNames()) {
- final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
- if (!normalizedFieldNames.contains(normalizedColName)) {
- throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
- }
- }
-
- final StringBuilder sqlBuilder = new StringBuilder();
- int fieldCount = 0;
- sqlBuilder.append("INSERT INTO ").append(tableName).append(" (");
-
- // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
- // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
- // columns that we are inserting into
- final Iterator<String> fieldNames = rootNode.getFieldNames();
- while (fieldNames.hasNext()) {
- final String fieldName = fieldNames.next();
-
- final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
- if (desc == null && !ignoreUnmappedFields) {
- throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database");
- }
-
- if (desc != null) {
- if (fieldCount++ > 0) {
- sqlBuilder.append(", ");
- }
-
- sqlBuilder.append(desc.getColumnName());
-
- final int sqlType = desc.getDataType();
- attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
-
- final Integer colSize = desc.getColumnSize();
- final JsonNode fieldNode = rootNode.get(fieldName);
- if (!fieldNode.isNull()) {
- String fieldValue = fieldNode.asText();
- if (colSize != null && fieldValue.length() > colSize) {
- fieldValue = fieldValue.substring(0, colSize);
- }
- attributes.put("sql.args." + fieldCount + ".value", fieldValue);
- }
- }
- }
-
- // complete the SQL statements by adding ?'s for all of the values to be escaped.
- sqlBuilder.append(") VALUES (");
- for (int i=0; i < fieldCount; i++) {
- if (i > 0) {
- sqlBuilder.append(", ");
- }
-
- sqlBuilder.append("?");
- }
- sqlBuilder.append(")");
-
- if (fieldCount == 0) {
- throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table");
- }
-
- return sqlBuilder.toString();
- }
-
- private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
- final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
-
- final Set<String> updateKeyNames;
- if (updateKeys == null) {
- updateKeyNames = schema.getPrimaryKeyColumnNames();
- } else {
- updateKeyNames = new HashSet<>();
- for (final String updateKey : updateKeys.split(",")) {
- updateKeyNames.add(updateKey.trim());
- }
- }
-
- if (updateKeyNames.isEmpty()) {
- throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
- }
-
- final StringBuilder sqlBuilder = new StringBuilder();
- int fieldCount = 0;
- sqlBuilder.append("UPDATE ").append(tableName).append(" SET ");
-
-
- // Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON
- // for each of the Update Key fields.
- final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
- final Set<String> normalizedUpdateNames = new HashSet<>();
- for (final String uk : updateKeyNames) {
- final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
- normalizedUpdateNames.add(normalizedUK);
-
- if (!normalizedFieldNames.contains(normalizedUK)) {
- throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
- }
- }
-
- // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
- // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
- // columns that we are inserting into
- Iterator<String> fieldNames = rootNode.getFieldNames();
- while (fieldNames.hasNext()) {
- final String fieldName = fieldNames.next();
-
- final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
- final ColumnDescription desc = schema.getColumns().get(normalizedColName);
-
- if (desc == null) {
- if (ignoreUnmappedFields) {
- throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database");
- } else {
- continue;
- }
- }
-
- // Check if this column is an Update Key. If so, skip it for now. We will come
- // back to it after we finish the SET clause
- if (normalizedUpdateNames.contains(normalizedColName)) {
- continue;
- }
-
- if (fieldCount++ > 0) {
- sqlBuilder.append(", ");
- }
-
- sqlBuilder.append(desc.getColumnName()).append(" = ?");
- final int sqlType = desc.getDataType();
- attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
-
- final Integer colSize = desc.getColumnSize();
-
- final JsonNode fieldNode = rootNode.get(fieldName);
- if (!fieldNode.isNull()) {
- String fieldValue = rootNode.get(fieldName).asText();
- if (colSize != null && fieldValue.length() > colSize) {
- fieldValue = fieldValue.substring(0, colSize);
- }
- attributes.put("sql.args." + fieldCount + ".value", fieldValue);
- }
- }
-
- // Set the WHERE clause based on the Update Key values
- sqlBuilder.append(" WHERE ");
-
- fieldNames = rootNode.getFieldNames();
- int whereFieldCount = 0;
- while (fieldNames.hasNext()) {
- final String fieldName = fieldNames.next();
-
- final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
- final ColumnDescription desc = schema.getColumns().get(normalizedColName);
- if (desc == null) {
- continue;
- }
-
- // Check if this column is a Update Key. If so, skip it for now. We will come
- // back to it after we finish the SET clause
- if (!normalizedUpdateNames.contains(normalizedColName)) {
- continue;
- }
-
- if (whereFieldCount++ > 0) {
- sqlBuilder.append(" AND ");
- }
- fieldCount++;
-
- sqlBuilder.append(normalizedColName).append(" = ?");
- final int sqlType = desc.getDataType();
- attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
-
- final Integer colSize = desc.getColumnSize();
- String fieldValue = rootNode.get(fieldName).asText();
- if (colSize != null && fieldValue.length() > colSize) {
- fieldValue = fieldValue.substring(0, colSize);
- }
- attributes.put("sql.args." + fieldCount + ".value", fieldValue);
- }
-
- return sqlBuilder.toString();
- }
-
- private static String normalizeColumnName(final String colName, final boolean translateColumnNames) {
- return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName;
- }
-
- private static class TableSchema {
- private List<String> requiredColumnNames;
- private Set<String> primaryKeyColumnNames;
- private Map<String, ColumnDescription> columns;
-
- private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
- final Set<String> primaryKeyColumnNames) {
- this.columns = new HashMap<>();
- this.primaryKeyColumnNames = primaryKeyColumnNames;
-
- this.requiredColumnNames = new ArrayList<>();
- for (final ColumnDescription desc : columnDescriptions) {
- columns.put(ConvertJSONToSQL.normalizeColumnName(desc.columnName, translateColumnNames), desc);
- if (desc.isRequired()) {
- requiredColumnNames.add(desc.columnName);
- }
- }
- }
-
- public Map<String, ColumnDescription> getColumns() {
- return columns;
- }
-
- public List<String> getRequiredColumnNames() {
- return requiredColumnNames;
- }
-
- public Set<String> getPrimaryKeyColumnNames() {
- return primaryKeyColumnNames;
- }
-
- public static TableSchema from(final Connection conn, final String catalog, final String tableName,
- final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
- try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%")) {
-
- final List<ColumnDescription> cols = new ArrayList<>();
- while (colrs.next()) {
- final ColumnDescription col = ColumnDescription.from(colrs);
- cols.add(col);
- }
-
- final Set<String> primaryKeyColumns = new HashSet<>();
- if (includePrimaryKeys) {
- try (final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName)) {
-
- while (pkrs.next()) {
- final String colName = pkrs.getString("COLUMN_NAME");
- primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames));
- }
- }
- }
-
- return new TableSchema(cols, translateColumnNames, primaryKeyColumns);
- }
- }
- }
-
- private static class ColumnDescription {
- private final String columnName;
- private final int dataType;
- private final boolean required;
- private final Integer columnSize;
-
- private ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) {
- this.columnName = columnName;
- this.dataType = dataType;
- this.required = required;
- this.columnSize = columnSize;
- }
-
- public int getDataType() {
- return dataType;
- }
-
- public Integer getColumnSize() {
- return columnSize;
- }
-
- public String getColumnName() {
- return columnName;
- }
-
- public boolean isRequired() {
- return required;
- }
-
- public static ColumnDescription from(final ResultSet resultSet) throws SQLException {
- final String columnName = resultSet.getString("COLUMN_NAME");
- final int dataType = resultSet.getInt("DATA_TYPE");
- final int colSize = resultSet.getInt("COLUMN_SIZE");
-
- final String nullableValue = resultSet.getString("IS_NULLABLE");
- final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty();
- final String defaultValue = resultSet.getString("COLUMN_DEF");
- final String autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT");
- final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue);
- final boolean required = !isNullable && !isAutoIncrement && defaultValue == null;
-
- return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize);
- }
- }
-
- private static class SchemaKey {
- private final String catalog;
- private final String tableName;
-
- public SchemaKey(final String catalog, final String tableName) {
- this.catalog = catalog;
- this.tableName = tableName;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((catalog == null) ? 0 : catalog.hashCode());
- result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
-
- final SchemaKey other = (SchemaKey) obj;
- if (catalog == null) {
- if (other.catalog != null) {
- return false;
- }
- } else if (!catalog.equals(other.catalog)) {
- return false;
- }
-
-
- if (tableName == null) {
- if (other.tableName != null) {
- return false;
- }
- } else if (!tableName.equals(other.tableName)) {
- return false;
- }
-
- return true;
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.ObjectHolder;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+
+@SideEffectFree
+@SupportsBatching
+@SeeAlso(PutSQL.class)
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"json", "sql", "database", "rdbms", "insert", "update", "relational", "flat"})
+@CapabilityDescription("Converts a JSON-formatted FlowFile into an UPDATE or INSERT SQL statement. The incoming FlowFile is expected to be "
+ + "\"flat\" JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If a field maps to "
+ + "a JSON object, that JSON object will be interpreted as Text. If the input is an array of JSON elements, each element in the array is "
+ + "output as a separate FlowFile to the 'sql' relationship. Upon successful conversion, the original FlowFile is routed to the 'original' "
+ + "relationship and the SQL is routed to the 'sql' relationship.")
+@WritesAttributes({
+ @WritesAttribute(attribute="mime.type", description="Sets mime.type of FlowFile that is routed to 'sql' to 'text/plain'."),
+ @WritesAttribute(attribute="sql.table", description="Sets the sql.table attribute of FlowFile that is routed to 'sql' to the name of the table that is updated by the SQL statement."),
+ @WritesAttribute(attribute="sql.catalog", description="If the Catalog name is set for this database, specifies the name of the catalog that the SQL statement will update. "
+ + "If no catalog is used, this attribute will not be added."),
+ @WritesAttribute(attribute="fragment.identifier", description="All FlowFiles routed to the 'sql' relationship for the same incoming FlowFile (multiple will be output for the same incoming "
+ + "FlowFile if the incoming FlowFile is a JSON Array) will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
+ @WritesAttribute(attribute="fragment.count", description="The number of SQL FlowFiles that were produced for same incoming FlowFile. This can be used in conjunction with the "
+ + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming FlowFile."),
+ @WritesAttribute(attribute="fragment.index", description="The position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same incoming FlowFile. This can be "
+ + "used in conjunction with the fragment.identifier and fragment.count attributes to know which FlowFiles originated from the same incoming FlowFile and in what order the SQL "
+ + "FlowFiles were produced"),
+ @WritesAttribute(attribute="sql.args.N.type", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The types of the Parameters "
+ + "to use are stored in attributes named sql.args.1.type, sql.args.2.type, sql.args.3.type, and so on. The type is a number representing a JDBC Type constant. "
+ + "Generally, this is useful only for software to read and interpret but is added so that a processor such as PutSQL can understand how to interpret the values."),
+ @WritesAttribute(attribute="sql.args.N.value", description="The output SQL statements are parameterized in order to avoid SQL Injection Attacks. The values of the Parameters "
+ + "to use are stored in the attributes named sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. Each of these attributes has a corresponding "
+ + "sql.args.N.type attribute that indicates how the value should be interpreted when inserting it into the database.")
+})
+public class ConvertJSONToSQL extends AbstractProcessor {
+ private static final String UPDATE_TYPE = "UPDATE";
+ private static final String INSERT_TYPE = "INSERT";
+
+ static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
+ "Any field in the JSON document that cannot be mapped to a column in the database is ignored");
+ static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail",
+ "If the JSON document has any field that cannot be mapped to a column in the database, the FlowFile will be routed to the failure relationship");
+
+ static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
+ .name("JDBC Connection Pool")
+ .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+ + "The Connection Pool is necessary in order to determine the appropriate database column types.")
+ .identifiesControllerService(DBCPService.class)
+ .required(true)
+ .build();
+ static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
+ .name("Statement Type")
+ .description("Specifies the type of SQL Statement to generate")
+ .required(true)
+ .allowableValues(UPDATE_TYPE, INSERT_TYPE)
+ .build();
+ static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+ .name("Table Name")
+ .description("The name of the table that the statement should update")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder()
+ .name("Catalog Name")
+ .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder()
+ .name("Translate Field Names")
+ .description("If true, the Processor will attempt to translate JSON field names into the appropriate column names for the table specified. "
+ + "If false, the JSON field names must match the column names exactly, or the column will not be updated")
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+ static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new PropertyDescriptor.Builder()
+ .name("Unmatched Field Behavior")
+ .description("If an incoming JSON element has a field that does not map to any of the database table's columns, this property specifies how to handle the situation")
+ .allowableValues(IGNORE_UNMATCHED_FIELD, FAIL_UNMATCHED_FIELD)
+ .defaultValue(IGNORE_UNMATCHED_FIELD.getValue())
+ .build();
+ static final PropertyDescriptor UPDATE_KEY = new PropertyDescriptor.Builder()
+ .name("Update Keys")
+ .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
+ + "If the Statement Type is UPDATE and this property is not set, the table's Primary Keys are used. "
+ + "In this case, if no Primary Key exists, the conversion to SQL will fail. "
+ + "This property is ignored if the Statement Type is INSERT")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .expressionLanguageSupported(true)
+ .build();
+
+
+ static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship")
+ .build();
+ static final Relationship REL_SQL = new Relationship.Builder()
+ .name("sql")
+ .description("A FlowFile is routed to this relationship when its contents have successfully been converted into a SQL statement")
+ .build();
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if it cannot be converted into a SQL statement. Common causes include invalid JSON "
+ + "content or the JSON content missing a required field (if using an INSERT statement type).")
+ .build();
+
+ private final Map<SchemaKey, TableSchema> schemaCache = new LinkedHashMap<SchemaKey, TableSchema>(100) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<SchemaKey,TableSchema> eldest) {
+ return true;
+ }
+ };
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(CONNECTION_POOL);
+ properties.add(STATEMENT_TYPE);
+ properties.add(TABLE_NAME);
+ properties.add(CATALOG_NAME);
+ properties.add(TRANSLATE_FIELD_NAMES);
+ properties.add(UNMATCHED_FIELD_BEHAVIOR);
+ properties.add(UPDATE_KEY);
+ return properties;
+ }
+
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_ORIGINAL);
+ rels.add(REL_SQL);
+ rels.add(REL_FAILURE);
+ return rels;
+ }
+
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ synchronized (this) {
+ schemaCache.clear();
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
+ final boolean ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
+ final String statementType = context.getProperty(STATEMENT_TYPE).getValue();
+ final String updateKeys = context.getProperty(UPDATE_KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+ final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
+ final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null;
+
+ // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
+ // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
+ // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
+ // Java Heap if there are a lot of different SQL statements being generated that reference different tables.
+ TableSchema schema;
+ synchronized (this) {
+ schema = schemaCache.get(schemaKey);
+ if (schema == null) {
+ // No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
+ final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
+ try (final Connection conn = dbcpService.getConnection()) {
+ schema = TableSchema.from(conn, catalog, tableName, translateFieldNames, includePrimaryKeys);
+ schemaCache.put(schemaKey, schema);
+ } catch (final SQLException e) {
+ getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+ }
+
+ // Parse the JSON document
+ final ObjectMapper mapper = new ObjectMapper();
+ final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null);
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ try (final InputStream bufferedIn = new BufferedInputStream(in)) {
+ rootNodeRef.set(mapper.readTree(bufferedIn));
+ }
+ }
+ });
+ } catch (final ProcessException pe) {
+ getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final JsonNode rootNode = rootNodeRef.get();
+
+ // The node may or may not be a Json Array. If it isn't, we will create an
+ // ArrayNode and add just the root node to it. We do this so that we can easily iterate
+ // over the array node, rather than duplicating the logic or creating another function that takes many variables
+ // in order to implement the logic.
+ final ArrayNode arrayNode;
+ if (rootNode.isArray()) {
+ arrayNode = (ArrayNode) rootNode;
+ } else {
+ final JsonNodeFactory nodeFactory = JsonNodeFactory.instance;
+ arrayNode = new ArrayNode(nodeFactory);
+ arrayNode.add(rootNode);
+ }
+
+ final String fragmentIdentifier = UUID.randomUUID().toString();
+
+ final Set<FlowFile> created = new HashSet<>();
+ for (int i=0; i < arrayNode.size(); i++) {
+ final JsonNode jsonNode = arrayNode.get(i);
+
+ final String sql;
+ final Map<String, String> attributes = new HashMap<>();
+
+ try {
+ if (INSERT_TYPE.equals(statementType)) {
+ sql = generateInsert(jsonNode, attributes, tableName, schema, translateFieldNames, ignoreUnmappedFields);
+ } else {
+ sql = generateUpdate(jsonNode, attributes, tableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields);
+ }
+ } catch (final ProcessException pe) {
+ getLogger().error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
+ new Object[] { flowFile, statementType, pe.toString() }, pe);
+ session.remove(created);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ FlowFile sqlFlowFile = session.create(flowFile);
+ created.add(sqlFlowFile);
+
+ sqlFlowFile = session.write(sqlFlowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ out.write(sql.getBytes(StandardCharsets.UTF_8));
+ }
+ });
+
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+ attributes.put("sql.table", tableName);
+ attributes.put("fragment.identifier", fragmentIdentifier);
+ attributes.put("fragment.count", String.valueOf(arrayNode.size()));
+ attributes.put("fragment.index", String.valueOf(i));
+
+ if (catalog != null) {
+ attributes.put("sql.catalog", catalog);
+ }
+
+ sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributes);
+ session.transfer(sqlFlowFile, REL_SQL);
+ }
+
+ session.transfer(flowFile, REL_ORIGINAL);
+ }
+
+ private Set<String> getNormalizedColumnNames(final JsonNode node, final boolean translateFieldNames) {
+ final Set<String> normalizedFieldNames = new HashSet<>();
+ final Iterator<String> fieldNameItr = node.getFieldNames();
+ while (fieldNameItr.hasNext()) {
+ normalizedFieldNames.add(normalizeColumnName(fieldNameItr.next(), translateFieldNames));
+ }
+
+ return normalizedFieldNames;
+ }
+
+ private String generateInsert(final JsonNode rootNode, final Map<String, String> attributes, final String tableName,
+ final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
+
+ final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
+ for (final String requiredColName : schema.getRequiredColumnNames()) {
+ final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
+ if (!normalizedFieldNames.contains(normalizedColName)) {
+ throw new ProcessException("JSON does not have a value for the Required column '" + requiredColName + "'");
+ }
+ }
+
+ final StringBuilder sqlBuilder = new StringBuilder();
+ int fieldCount = 0;
+ sqlBuilder.append("INSERT INTO ").append(tableName).append(" (");
+
+ // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
+ // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
+ // columns that we are inserting into
+ final Iterator<String> fieldNames = rootNode.getFieldNames();
+ while (fieldNames.hasNext()) {
+ final String fieldName = fieldNames.next();
+
+ final ColumnDescription desc = schema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
+ if (desc == null && !ignoreUnmappedFields) {
+ throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database");
+ }
+
+ if (desc != null) {
+ if (fieldCount++ > 0) {
+ sqlBuilder.append(", ");
+ }
+
+ sqlBuilder.append(desc.getColumnName());
+
+ final int sqlType = desc.getDataType();
+ attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
+
+ final Integer colSize = desc.getColumnSize();
+ final JsonNode fieldNode = rootNode.get(fieldName);
+ if (!fieldNode.isNull()) {
+ String fieldValue = fieldNode.asText();
+ if (colSize != null && fieldValue.length() > colSize) {
+ fieldValue = fieldValue.substring(0, colSize);
+ }
+ attributes.put("sql.args." + fieldCount + ".value", fieldValue);
+ }
+ }
+ }
+
+ // complete the SQL statements by adding ?'s for all of the values to be escaped.
+ sqlBuilder.append(") VALUES (");
+ for (int i=0; i < fieldCount; i++) {
+ if (i > 0) {
+ sqlBuilder.append(", ");
+ }
+
+ sqlBuilder.append("?");
+ }
+ sqlBuilder.append(")");
+
+ if (fieldCount == 0) {
+ throw new ProcessException("None of the fields in the JSON map to the columns defined by the " + tableName + " table");
+ }
+
+ return sqlBuilder.toString();
+ }
+
+ private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
+ final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields) {
+
+ final Set<String> updateKeyNames;
+ if (updateKeys == null) {
+ updateKeyNames = schema.getPrimaryKeyColumnNames();
+ } else {
+ updateKeyNames = new HashSet<>();
+ for (final String updateKey : updateKeys.split(",")) {
+ updateKeyNames.add(updateKey.trim());
+ }
+ }
+
+ if (updateKeyNames.isEmpty()) {
+ throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
+ }
+
+ final StringBuilder sqlBuilder = new StringBuilder();
+ int fieldCount = 0;
+ sqlBuilder.append("UPDATE ").append(tableName).append(" SET ");
+
+
+ // Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON
+ // for each of the Update Key fields.
+ final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
+ final Set<String> normalizedUpdateNames = new HashSet<>();
+ for (final String uk : updateKeyNames) {
+ final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
+ normalizedUpdateNames.add(normalizedUK);
+
+ if (!normalizedFieldNames.contains(normalizedUK)) {
+ throw new ProcessException("JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'");
+ }
+ }
+
+ // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
+ // adding the column value to a "sql.args.N.value" attribute and the type of a "sql.args.N.type" attribute add the
+ // columns that we are inserting into
+ Iterator<String> fieldNames = rootNode.getFieldNames();
+ while (fieldNames.hasNext()) {
+ final String fieldName = fieldNames.next();
+
+ final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
+ final ColumnDescription desc = schema.getColumns().get(normalizedColName);
+
+ if (desc == null) {
+ if (ignoreUnmappedFields) {
+ throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database");
+ } else {
+ continue;
+ }
+ }
+
+ // Check if this column is an Update Key. If so, skip it for now. We will come
+ // back to it after we finish the SET clause
+ if (normalizedUpdateNames.contains(normalizedColName)) {
+ continue;
+ }
+
+ if (fieldCount++ > 0) {
+ sqlBuilder.append(", ");
+ }
+
+ sqlBuilder.append(desc.getColumnName()).append(" = ?");
+ final int sqlType = desc.getDataType();
+ attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
+
+ final Integer colSize = desc.getColumnSize();
+
+ final JsonNode fieldNode = rootNode.get(fieldName);
+ if (!fieldNode.isNull()) {
+ String fieldValue = rootNode.get(fieldName).asText();
+ if (colSize != null && fieldValue.length() > colSize) {
+ fieldValue = fieldValue.substring(0, colSize);
+ }
+ attributes.put("sql.args." + fieldCount + ".value", fieldValue);
+ }
+ }
+
+ // Set the WHERE clause based on the Update Key values
+ sqlBuilder.append(" WHERE ");
+
+ fieldNames = rootNode.getFieldNames();
+ int whereFieldCount = 0;
+ while (fieldNames.hasNext()) {
+ final String fieldName = fieldNames.next();
+
+ final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
+ final ColumnDescription desc = schema.getColumns().get(normalizedColName);
+ if (desc == null) {
+ continue;
+ }
+
+ // Check if this column is a Update Key. If so, skip it for now. We will come
+ // back to it after we finish the SET clause
+ if (!normalizedUpdateNames.contains(normalizedColName)) {
+ continue;
+ }
+
+ if (whereFieldCount++ > 0) {
+ sqlBuilder.append(" AND ");
+ }
+ fieldCount++;
+
+ sqlBuilder.append(normalizedColName).append(" = ?");
+ final int sqlType = desc.getDataType();
+ attributes.put("sql.args." + fieldCount + ".type", String.valueOf(sqlType));
+
+ final Integer colSize = desc.getColumnSize();
+ String fieldValue = rootNode.get(fieldName).asText();
+ if (colSize != null && fieldValue.length() > colSize) {
+ fieldValue = fieldValue.substring(0, colSize);
+ }
+ attributes.put("sql.args." + fieldCount + ".value", fieldValue);
+ }
+
+ return sqlBuilder.toString();
+ }
+
+ private static String normalizeColumnName(final String colName, final boolean translateColumnNames) {
+ return translateColumnNames ? colName.toUpperCase().replace("_", "") : colName;
+ }
+
+ private static class TableSchema {
+ private List<String> requiredColumnNames;
+ private Set<String> primaryKeyColumnNames;
+ private Map<String, ColumnDescription> columns;
+
+ private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
+ final Set<String> primaryKeyColumnNames) {
+ this.columns = new HashMap<>();
+ this.primaryKeyColumnNames = primaryKeyColumnNames;
+
+ this.requiredColumnNames = new ArrayList<>();
+ for (final ColumnDescription desc : columnDescriptions) {
+ columns.put(ConvertJSONToSQL.normalizeColumnName(desc.columnName, translateColumnNames), desc);
+ if (desc.isRequired()) {
+ requiredColumnNames.add(desc.columnName);
+ }
+ }
+ }
+
+ public Map<String, ColumnDescription> getColumns() {
+ return columns;
+ }
+
+ public List<String> getRequiredColumnNames() {
+ return requiredColumnNames;
+ }
+
+ public Set<String> getPrimaryKeyColumnNames() {
+ return primaryKeyColumnNames;
+ }
+
+ public static TableSchema from(final Connection conn, final String catalog, final String tableName,
+ final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException {
+ try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%")) {
+
+ final List<ColumnDescription> cols = new ArrayList<>();
+ while (colrs.next()) {
+ final ColumnDescription col = ColumnDescription.from(colrs);
+ cols.add(col);
+ }
+
+ final Set<String> primaryKeyColumns = new HashSet<>();
+ if (includePrimaryKeys) {
+ try (final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName)) {
+
+ while (pkrs.next()) {
+ final String colName = pkrs.getString("COLUMN_NAME");
+ primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames));
+ }
+ }
+ }
+
+ return new TableSchema(cols, translateColumnNames, primaryKeyColumns);
+ }
+ }
+ }
+
+ private static class ColumnDescription {
+ private final String columnName;
+ private final int dataType;
+ private final boolean required;
+ private final Integer columnSize;
+
+ private ColumnDescription(final String columnName, final int dataType, final boolean required, final Integer columnSize) {
+ this.columnName = columnName;
+ this.dataType = dataType;
+ this.required = required;
+ this.columnSize = columnSize;
+ }
+
+ public int getDataType() {
+ return dataType;
+ }
+
+ public Integer getColumnSize() {
+ return columnSize;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public boolean isRequired() {
+ return required;
+ }
+
+ public static ColumnDescription from(final ResultSet resultSet) throws SQLException {
+ final String columnName = resultSet.getString("COLUMN_NAME");
+ final int dataType = resultSet.getInt("DATA_TYPE");
+ final int colSize = resultSet.getInt("COLUMN_SIZE");
+
+ final String nullableValue = resultSet.getString("IS_NULLABLE");
+ final boolean isNullable = "YES".equalsIgnoreCase(nullableValue) || nullableValue.isEmpty();
+ final String defaultValue = resultSet.getString("COLUMN_DEF");
+ final String autoIncrementValue = resultSet.getString("IS_AUTOINCREMENT");
+ final boolean isAutoIncrement = "YES".equalsIgnoreCase(autoIncrementValue);
+ final boolean required = !isNullable && !isAutoIncrement && defaultValue == null;
+
+ return new ColumnDescription(columnName, dataType, required, colSize == 0 ? null : colSize);
+ }
+ }
+
+ private static class SchemaKey {
+ private final String catalog;
+ private final String tableName;
+
+ public SchemaKey(final String catalog, final String tableName) {
+ this.catalog = catalog;
+ this.tableName = tableName;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((catalog == null) ? 0 : catalog.hashCode());
+ result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ final SchemaKey other = (SchemaKey) obj;
+ if (catalog == null) {
+ if (other.catalog != null) {
+ return false;
+ }
+ } else if (!catalog.equals(other.catalog)) {
+ return false;
+ }
+
+
+ if (tableName == null) {
+ if (other.tableName != null) {
+ return false;
+ }
+ } else if (!tableName.equals(other.tableName)) {
+ return false;
+ }
+
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java
index 021a94f..ecc8e60 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java
@@ -1,84 +1,84 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-
-@EventDriven
-@SupportsBatching
-@Tags({"test", "load", "duplicate"})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Intended for load testing, this processor will create the configured number of copies of each incoming FlowFile")
-public class DuplicateFlowFile extends AbstractProcessor {
-
- static final PropertyDescriptor NUM_COPIES = new PropertyDescriptor.Builder()
- .name("Number of Copies")
- .description("Specifies how many copies of each incoming FlowFile will be made")
- .required(true)
- .expressionLanguageSupported(false)
- .defaultValue("100")
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .build();
-
- static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("The original FlowFile and all copies will be sent to this relationship")
- .build();
-
- @Override
- public Set<Relationship> getRelationships() {
- return Collections.singleton(REL_SUCCESS);
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return Collections.singletonList(NUM_COPIES);
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- for (int i=0; i < context.getProperty(NUM_COPIES).asInteger(); i++) {
- final FlowFile copy = session.clone(flowFile);
- session.transfer(copy, REL_SUCCESS);
- }
-
- session.transfer(flowFile, REL_SUCCESS);
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@EventDriven
+@SupportsBatching
+@Tags({"test", "load", "duplicate"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Intended for load testing, this processor will create the configured number of copies of each incoming FlowFile")
+public class DuplicateFlowFile extends AbstractProcessor {
+
+ static final PropertyDescriptor NUM_COPIES = new PropertyDescriptor.Builder()
+ .name("Number of Copies")
+ .description("Specifies how many copies of each incoming FlowFile will be made")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .defaultValue("100")
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("The original FlowFile and all copies will be sent to this relationship")
+ .build();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return Collections.singletonList(NUM_COPIES);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ for (int i=0; i < context.getProperty(NUM_COPIES).asInteger(); i++) {
+ final FlowFile copy = session.clone(flowFile);
+ session.transfer(copy, REL_SUCCESS);
+ }
+
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+
+}