You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/19 07:20:48 UTC

[05/24] nifi git commit: NIFI-1054: Fixing Line endings of source code

http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index c2056fe..7f32e54 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -1,923 +1,923 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.sql.BatchUpdateException;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.SQLNonTransientException;
-import java.sql.Statement;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.ReadsAttribute;
-import org.apache.nifi.annotation.behavior.ReadsAttributes;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.FlowFileFilter;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.StreamUtils;
-
-@SupportsBatching
-@SeeAlso(ConvertJSONToSQL.class)
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
-@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command "
-    + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
-    + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
-    + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
-@ReadsAttributes({
-    @ReadsAttribute(attribute = "fragment.identifier", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or "
-        + "not two FlowFiles belong to the same transaction."),
-    @ReadsAttribute(attribute = "fragment.count", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles "
-        + "are needed to complete the transaction."),
-    @ReadsAttribute(attribute = "fragment.index", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles "
-        + "in a transaction should be evaluated."),
-    @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer "
-        + "that represents the JDBC Type of the parameter."),
-    @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as "
-        + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.")
-})
-@WritesAttributes({
-    @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
-        + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.")
-})
-public class PutSQL extends AbstractProcessor {
-
-    static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
-        .name("JDBC Connection Pool")
-        .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
-            + "The Connection Pool is necessary in order to determine the appropriate database column types.")
-        .identifiesControllerService(DBCPService.class)
-        .required(true)
-        .build();
-    static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder()
-        .name("Support Fragmented Transactions")
-        .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. "
-            + "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; "
-            + "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. "
-            + "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.")
-        .allowableValues("true", "false")
-        .defaultValue("true")
-        .build();
-    static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder()
-        .name("Transaction Timeout")
-        .description("If the <Support Fragmented Transactions> property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute "
-            + "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship")
-        .required(false)
-        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-        .build();
-    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
-        .name("Batch Size")
-        .description("The preferred number of FlowFiles to put to the database in a single transaction")
-        .required(true)
-        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-        .defaultValue("100")
-        .build();
-    static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder()
-        .name("Obtain Generated Keys")
-        .description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. "
-            + "This may result in slightly slower performance and is not supported by all databases.")
-        .allowableValues("true", "false")
-        .defaultValue("false")
-        .build();
-
-    static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("A FlowFile is routed to this relationship after the database is successfully updated")
-        .build();
-    static final Relationship REL_RETRY = new Relationship.Builder()
-        .name("retry")
-        .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
-        .build();
-    static final Relationship REL_FAILURE = new Relationship.Builder()
-        .name("failure")
-        .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, "
-            + "such as an invalid query or an integrity constraint violation")
-        .build();
-
-    private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
-    private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
-
-    private static final String FRAGMENT_ID_ATTR = "fragment.identifier";
-    private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
-    private static final String FRAGMENT_COUNT_ATTR = "fragment.count";
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(CONNECTION_POOL);
-        properties.add(SUPPORT_TRANSACTIONS);
-        properties.add(TRANSACTION_TIMEOUT);
-        properties.add(BATCH_SIZE);
-        properties.add(OBTAIN_GENERATED_KEYS);
-        return properties;
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        final Set<Relationship> rels = new HashSet<>();
-        rels.add(REL_SUCCESS);
-        rels.add(REL_RETRY);
-        rels.add(REL_FAILURE);
-        return rels;
-    }
-
-
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final FlowFilePoll poll = pollFlowFiles(context, session);
-        if (poll == null) {
-            return;
-        }
-
-        final List<FlowFile> flowFiles = poll.getFlowFiles();
-        if (flowFiles == null) {
-            return;
-        }
-
-        final long startNanos = System.nanoTime();
-        final boolean obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean();
-        final Map<String, StatementFlowFileEnclosure> statementMap = new HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles
-        final List<FlowFile> sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent
-        final List<FlowFile> processedFlowFiles = new ArrayList<>(); // all flowfiles that we have processed
-        final Set<StatementFlowFileEnclosure> enclosuresToExecute = new LinkedHashSet<>(); // the enclosures that we've processed
-
-        // Because we can have a transaction that is necessary across multiple FlowFiles, things get complicated when
-        // some FlowFiles have been transferred to a relationship and then there is a failure. As a result, we will just
-        // map all FlowFiles to their destination relationship and do the session.transfer at the end. This way, if there
-        // is a failure, we can route all FlowFiles to failure if we need to.
-        final Map<FlowFile, Relationship> destinationRelationships = new HashMap<>();
-
-        final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
-        try (final Connection conn = dbcpService.getConnection()) {
-            final boolean originalAutoCommit = conn.getAutoCommit();
-            try {
-                conn.setAutoCommit(false);
-
-                for (final FlowFile flowFile : flowFiles) {
-                    processedFlowFiles.add(flowFile);
-                    final String sql = getSQL(session, flowFile);
-
-                    // Get the appropriate PreparedStatement to use.
-                    final StatementFlowFileEnclosure enclosure;
-                    try {
-                        enclosure = getEnclosure(sql, conn, statementMap, obtainKeys, poll.isFragmentedTransaction());
-                    } catch (final SQLNonTransientException e) {
-                        getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e});
-                        destinationRelationships.put(flowFile, REL_FAILURE);
-                        continue;
-                    }
-
-                    final PreparedStatement stmt = enclosure.getStatement();
-
-                    // set the appropriate parameters on the statement.
-                    try {
-                        setParameters(stmt, flowFile.getAttributes());
-                    } catch (final SQLException | ProcessException pe) {
-                        getLogger().error("Cannot update database for {} due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
-                        destinationRelationships.put(flowFile, REL_FAILURE);
-                        continue;
-                    }
-
-                    // If we need to obtain keys, we cannot do so in a a Batch Update. So we have to execute the statement and close it.
-                    if (obtainKeys) {
-                        try {
-                            // Execute the actual update.
-                            stmt.executeUpdate();
-
-                            // attempt to determine the key that was generated, if any. This is not supported by all
-                            // database vendors, so if we cannot determine the generated key (or if the statement is not an INSERT),
-                            // we will just move on without setting the attribute.
-                            FlowFile sentFlowFile = flowFile;
-                            final String generatedKey = determineGeneratedKey(stmt);
-                            if (generatedKey != null) {
-                                sentFlowFile = session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey);
-                            }
-
-                            stmt.close();
-                            sentFlowFiles.add(sentFlowFile);
-                        } catch (final SQLNonTransientException e) {
-                            getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e});
-                            destinationRelationships.put(flowFile, REL_FAILURE);
-                            continue;
-                        } catch (final SQLException e) {
-                            getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {flowFile, e});
-                            destinationRelationships.put(flowFile, REL_RETRY);
-                            continue;
-                        }
-                    } else {
-                        // We don't need to obtain keys. Just add the statement to the batch.
-                        stmt.addBatch();
-                        enclosure.addFlowFile(flowFile);
-                        enclosuresToExecute.add(enclosure);
-                    }
-                }
-
-                // If we are not trying to obtain the generated keys, we will have
-                // PreparedStatement's that have batches added to them. We need to execute each batch and close
-                // the PreparedStatement.
-                for (final StatementFlowFileEnclosure enclosure : enclosuresToExecute) {
-                    try {
-                        final PreparedStatement stmt = enclosure.getStatement();
-                        stmt.executeBatch();
-                        sentFlowFiles.addAll(enclosure.getFlowFiles());
-                    } catch (final BatchUpdateException e) {
-                        // If we get a BatchUpdateException, then we want to determine which FlowFile caused the failure,
-                        // and route that FlowFile to failure while routing those that finished processing to success and those
-                        // that have not yet been executed to retry. If the FlowFile was
-                        // part of a fragmented transaction, then we must roll back all updates for this connection, because
-                        // other statements may have been successful and been part of this transaction.
-                        final int[] updateCounts = e.getUpdateCounts();
-                        final int offendingFlowFileIndex = updateCounts.length;
-                        final List<FlowFile> batchFlowFiles = enclosure.getFlowFiles();
-
-                        if (poll.isFragmentedTransaction()) {
-                            // There are potentially multiple statements for this one transaction. As a result,
-                            // we need to roll back the entire transaction and route all of the FlowFiles to failure.
-                            conn.rollback();
-                            final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex);
-                            getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. "
-                                + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e});
-                            session.transfer(flowFiles, REL_FAILURE);
-                            return;
-                        }
-
-                        // In the presence of a BatchUpdateException, the driver has the option of either stopping when an error
-                        // occurs, or continuing. If it continues, then it must account for all statements in the batch and for
-                        // those that fail return a Statement.EXECUTE_FAILED for the number of rows updated.
-                        // So we will iterate over all of the update counts returned. If any is equal to Statement.EXECUTE_FAILED,
-                        // we will route the corresponding FlowFile to failure. Otherwise, the FlowFile will go to success
-                        // unless it has not yet been processed (its index in the List > updateCounts.length).
-                        int failureCount = 0;
-                        int successCount = 0;
-                        int retryCount = 0;
-                        for (int i = 0; i < updateCounts.length; i++) {
-                            final int updateCount = updateCounts[i];
-                            final FlowFile flowFile = batchFlowFiles.get(i);
-                            if (updateCount == Statement.EXECUTE_FAILED) {
-                                destinationRelationships.put(flowFile, REL_FAILURE);
-                                failureCount++;
-                            } else {
-                                destinationRelationships.put(flowFile, REL_SUCCESS);
-                                successCount++;
-                            }
-                        }
-
-                        if (failureCount == 0) {
-                            // if no failures found, the driver decided not to execute the statements after the
-                            // failure, so route the last one to failure.
-                            final FlowFile failedFlowFile = batchFlowFiles.get(updateCounts.length);
-                            destinationRelationships.put(failedFlowFile, REL_FAILURE);
-                            failureCount++;
-                        }
-
-                        if (updateCounts.length < batchFlowFiles.size()) {
-                            final List<FlowFile> unexecuted = batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size());
-                            for (final FlowFile flowFile : unexecuted) {
-                                destinationRelationships.put(flowFile, REL_RETRY);
-                                retryCount++;
-                            }
-                        }
-
-                        getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, "
-                            + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount});
-                    } catch (final SQLNonTransientException e) {
-                        getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e});
-
-                        for (final FlowFile flowFile : enclosure.getFlowFiles()) {
-                            destinationRelationships.put(flowFile, REL_FAILURE);
-                        }
-                        continue;
-                    } catch (final SQLException e) {
-                        getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
-                            new Object[] {enclosure.getFlowFiles(), e});
-
-                        for (final FlowFile flowFile : enclosure.getFlowFiles()) {
-                            destinationRelationships.put(flowFile, REL_RETRY);
-                        }
-                        continue;
-                    } finally {
-                        enclosure.getStatement().close();
-                    }
-                }
-            } finally {
-                try {
-                    conn.commit();
-                } finally {
-                    // make sure that we try to set the auto commit back to whatever it was.
-                    if (originalAutoCommit) {
-                        try {
-                            conn.setAutoCommit(originalAutoCommit);
-                        } catch (final SQLException se) {
-                        }
-                    }
-                }
-            }
-
-            // Determine the database URL
-            String url = "jdbc://unknown-host";
-            try {
-                url = conn.getMetaData().getURL();
-            } catch (final SQLException sqle) {
-            }
-
-            // Emit a Provenance SEND event
-            final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-            for (final FlowFile flowFile : sentFlowFiles) {
-                session.getProvenanceReporter().send(flowFile, url, transmissionMillis, true);
-            }
-
-            for (final FlowFile flowFile : sentFlowFiles) {
-                destinationRelationships.put(flowFile, REL_SUCCESS);
-            }
-        } catch (final SQLException e) {
-            // Failed FlowFiles are all of them that we have processed minus those that were successfully sent
-            final List<FlowFile> failedFlowFiles = processedFlowFiles;
-            failedFlowFiles.removeAll(sentFlowFiles);
-
-            // All FlowFiles yet to be processed is all FlowFiles minus those processed
-            final List<FlowFile> retry = flowFiles;
-            retry.removeAll(processedFlowFiles);
-
-            final Relationship rel;
-            if (e instanceof SQLNonTransientException) {
-                getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {failedFlowFiles, e});
-                rel = REL_FAILURE;
-            } else {
-                getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {failedFlowFiles, e});
-                rel = REL_RETRY;
-            }
-
-            for (final FlowFile flowFile : failedFlowFiles) {
-                destinationRelationships.put(flowFile, rel);
-            }
-
-            for (final FlowFile flowFile : retry) {
-                destinationRelationships.put(flowFile, Relationship.SELF);
-            }
-        }
-
-        for (final Map.Entry<FlowFile, Relationship> entry : destinationRelationships.entrySet()) {
-            session.transfer(entry.getKey(), entry.getValue());
-        }
-    }
-
-
-    /**
-     * Pulls a batch of FlowFiles from the incoming queues. If no FlowFiles are available, returns <code>null</code>.
-     * Otherwise, a List of FlowFiles will be returned.
-     *
-     * If all FlowFiles pulled are not eligible to be processed, the FlowFiles will be penalized and transferred back
-     * to the input queue and an empty List will be returned.
-     *
-     * Otherwise, if the Support Fragmented Transactions property is true, all FlowFiles that belong to the same
-     * transaction will be sorted in the order that they should be evaluated.
-     *
-     * @param context the process context for determining properties
-     * @param session the process session for pulling flowfiles
-     * @return a FlowFilePoll containing a List of FlowFiles to process, or <code>null</code> if there are no FlowFiles to process
-     */
-    private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSession session) {
-        // Determine which FlowFile Filter to use in order to obtain FlowFiles.
-        final boolean useTransactions = context.getProperty(SUPPORT_TRANSACTIONS).asBoolean();
-        boolean fragmentedTransaction = false;
-
-        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
-        List<FlowFile> flowFiles;
-        if (useTransactions) {
-            final TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter();
-            flowFiles = session.get(filter);
-            fragmentedTransaction = filter.isFragmentedTransaction();
-        } else {
-            flowFiles = session.get(batchSize);
-        }
-
-        if (flowFiles.isEmpty()) {
-            return null;
-        }
-
-        // If we are supporting fragmented transactions, verify that all FlowFiles are correct
-        if (fragmentedTransaction) {
-            final Relationship relationship = determineRelationship(flowFiles, context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS));
-            if (relationship != null) {
-                // if transferring back to self, penalize the FlowFiles.
-                if (relationship == Relationship.SELF) {
-                    // penalize all of the FlowFiles that we are going to route to SELF.
-                    final ListIterator<FlowFile> itr = flowFiles.listIterator();
-                    while (itr.hasNext()) {
-                        final FlowFile flowFile = itr.next();
-                        final FlowFile penalized = session.penalize(flowFile);
-                        itr.remove();
-                        itr.add(penalized);
-                    }
-                }
-
-                session.transfer(flowFiles, relationship);
-                return null;
-            }
-
-            // sort by fragment index.
-            Collections.sort(flowFiles, new Comparator<FlowFile>() {
-                @Override
-                public int compare(final FlowFile o1, final FlowFile o2) {
-                    return Integer.compare(Integer.parseInt(o1.getAttribute(FRAGMENT_INDEX_ATTR)), Integer.parseInt(o2.getAttribute(FRAGMENT_INDEX_ATTR)));
-                }
-            });
-        }
-
-        return new FlowFilePoll(flowFiles, fragmentedTransaction);
-    }
-
-
-    /**
-     * Returns the key that was generated from the given statement, or <code>null</code> if no key
-     * was generated or it could not be determined.
-     *
-     * @param stmt the statement that generated a key
-     * @return the key that was generated from the given statement, or <code>null</code> if no key
-     *         was generated or it could not be determined.
-     */
-    private String determineGeneratedKey(final PreparedStatement stmt) {
-        try {
-            final ResultSet generatedKeys = stmt.getGeneratedKeys();
-            if (generatedKeys != null && generatedKeys.next()) {
-                return generatedKeys.getString(1);
-            }
-        } catch (final SQLException sqle) {
-            // This is not supported by all vendors. This is a best-effort approach.
-        }
-
-        return null;
-    }
-
-
-    /**
-     * Returns the StatementFlowFileEnclosure that should be used for executing the given SQL statement
-     *
-     * @param sql the SQL to execute
-     * @param conn the connection from which a PreparedStatement can be created
-     * @param stmtMap the existing map of SQL to PreparedStatements
-     * @param obtainKeys whether or not we need to obtain generated keys for INSERT statements
-     * @param fragmentedTransaction whether or not the SQL pertains to a fragmented transaction
-     *
-     * @return a StatementFlowFileEnclosure to use for executing the given SQL statement
-     *
-     * @throws SQLException if unable to create the appropriate PreparedStatement
-     */
-    private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap,
-        final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException {
-        StatementFlowFileEnclosure enclosure = stmtMap.get(sql);
-        if (enclosure != null) {
-            return enclosure;
-        }
-
-        if (obtainKeys) {
-            // Create a new Prepared Statement, requesting that it return the generated keys.
-            PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
-
-            if (stmt == null) {
-                // since we are passing Statement.RETURN_GENERATED_KEYS, calls to conn.prepareStatement will
-                // in some cases (at least for DerbyDB) return null.
-                // We will attempt to recompile the statement without the generated keys being returned.
-                stmt = conn.prepareStatement(sql);
-            }
-
-            // If we need to obtain keys, then we cannot do a Batch Update. In this case,
-            // we don't need to store the PreparedStatement in the Map because we aren't
-            // doing an addBatch/executeBatch. Instead, we will use the statement once
-            // and close it.
-            return new StatementFlowFileEnclosure(stmt);
-        } else if (fragmentedTransaction) {
-            // We cannot use Batch Updates if we have a transaction that spans multiple FlowFiles.
-            // If we did, we could end up processing the statements out of order. It's quite possible
-            // that we could refactor the code some to allow for this, but as it is right now, this
-            // could cause problems. This is because we have a Map<String, StatementFlowFileEnclosure>.
-            // If we had a transaction that needed to execute Stmt A with some parameters, then Stmt B with
-            // some parameters, then Stmt A with different parameters, this would become problematic because
-            // the executeUpdate would be evaluated first for Stmt A (the 1st and 3rd statements, and then
-            // the second statement would be evaluated).
-            final PreparedStatement stmt = conn.prepareStatement(sql);
-            return new StatementFlowFileEnclosure(stmt);
-        }
-
-        final PreparedStatement stmt = conn.prepareStatement(sql);
-        enclosure = new StatementFlowFileEnclosure(stmt);
-        stmtMap.put(sql, enclosure);
-        return enclosure;
-    }
-
-
-    /**
-     * Determines the SQL statement that should be executed for the given FlowFile
-     *
-     * @param session the session that can be used to access the given FlowFile
-     * @param flowFile the FlowFile whose SQL statement should be executed
-     *
-     * @return the SQL that is associated with the given FlowFile
-     */
-    private String getSQL(final ProcessSession session, final FlowFile flowFile) {
-        // Read the SQL from the FlowFile's content
-        final byte[] buffer = new byte[(int) flowFile.getSize()];
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream in) throws IOException {
-                StreamUtils.fillBuffer(in, buffer);
-            }
-        });
-
-        // Create the PreparedStatement to use for this FlowFile.
-        final String sql = new String(buffer, StandardCharsets.UTF_8);
-        return sql;
-    }
-
-
-    /**
-     * Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes.
-     *
-     * @param stmt the statement to set the parameters on
-     * @param attributes the attributes from which to derive parameter indices, values, and types
-     * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called
-     */
-    private void setParameters(final PreparedStatement stmt, final Map<String, String> attributes) throws SQLException {
-        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
-            final String key = entry.getKey();
-            final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
-            if (matcher.matches()) {
-                final int parameterIndex = Integer.parseInt(matcher.group(1));
-
-                final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
-                if (!isNumeric) {
-                    throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type");
-                }
-
-                final int jdbcType = Integer.parseInt(entry.getValue());
-                final String valueAttrName = "sql.args." + parameterIndex + ".value";
-                final String parameterValue = attributes.get(valueAttrName);
-
-                try {
-                    setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType);
-                } catch (final NumberFormatException nfe) {
-                    throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe);
-                }
-            }
-        }
-    }
-
-
-    /**
-     * Determines which relationship the given FlowFiles should go to, based on a transaction timing out or
-     * transaction information not being present. If the FlowFiles should be processed and not transferred
-     * to any particular relationship yet, will return <code>null</code>
-     *
-     * @param flowFiles the FlowFiles whose relationship is to be determined
-     * @param transactionTimeoutMillis the maximum amount of time (in milliseconds) that we should wait
-     *            for all FlowFiles in a transaction to be present before routing to failure
-     * @return the appropriate relationship to route the FlowFiles to, or <code>null</code> if the FlowFiles
-     *         should instead be processed
-     */
-    Relationship determineRelationship(final List<FlowFile> flowFiles, final Long transactionTimeoutMillis) {
-        int selectedNumFragments = 0;
-        final BitSet bitSet = new BitSet();
-
-        for (final FlowFile flowFile : flowFiles) {
-            final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR);
-            if (fragmentCount == null && flowFiles.size() == 1) {
-                return null;
-            } else if (fragmentCount == null) {
-                getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier "
-                    + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()});
-                return REL_FAILURE;
-            }
-
-            final int numFragments;
-            try {
-                numFragments = Integer.parseInt(fragmentCount);
-            } catch (final NumberFormatException nfe) {
-                getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; "
-                    + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
-                return REL_FAILURE;
-            }
-
-            if (numFragments < 1) {
-                getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; "
-                    + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
-                return REL_FAILURE;
-            }
-
-            if (selectedNumFragments == 0) {
-                selectedNumFragments = numFragments;
-            } else if (numFragments != selectedNumFragments) {
-                getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; "
-                    + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
-                return REL_FAILURE;
-            }
-
-            final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR);
-            if (fragmentIndex == null) {
-                getLogger().error("Cannot process {} because the fragment.index attribute is missing; "
-                    + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
-                return REL_FAILURE;
-            }
-
-            final int idx;
-            try {
-                idx = Integer.parseInt(fragmentIndex);
-            } catch (final NumberFormatException nfe) {
-                getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; "
-                    + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
-                return REL_FAILURE;
-            }
-
-            if (idx < 0) {
-                getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; "
-                    + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
-                return REL_FAILURE;
-            }
-
-            if (bitSet.get(idx)) {
-                getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; "
-                    + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
-                return REL_FAILURE;
-            }
-
-            bitSet.set(idx);
-        }
-
-        if (selectedNumFragments == flowFiles.size()) {
-            return null; // no relationship to route FlowFiles to yet - process the FlowFiles.
-        }
-
-        long latestQueueTime = 0L;
-        for (final FlowFile flowFile : flowFiles) {
-            if (flowFile.getLastQueueDate() != null && flowFile.getLastQueueDate() > latestQueueTime) {
-                latestQueueTime = flowFile.getLastQueueDate();
-            }
-        }
-
-        if (transactionTimeoutMillis != null) {
-            if (latestQueueTime > 0L && System.currentTimeMillis() - latestQueueTime > transactionTimeoutMillis) {
-                getLogger().error("The transaction timeout has expired for the following FlowFiles; they will be routed to failure: {}", new Object[] {flowFiles});
-                return REL_FAILURE;
-            }
-        }
-
-        getLogger().debug("Not enough FlowFiles for transaction. Returning all FlowFiles to queue");
-        return Relationship.SELF; // not enough FlowFiles for this transaction. Return them all to queue.
-    }
-
-    /**
-     * Determines how to map the given value to the appropriate JDBC data type and sets the parameter on the
-     * provided PreparedStatement
-     *
-     * @param stmt the PreparedStatement to set the parameter on
-     * @param attrName the name of the attribute that the parameter is coming from - for logging purposes
-     * @param parameterIndex the index of the SQL parameter to set
-     * @param parameterValue the value of the SQL parameter to set
-     * @param jdbcType the JDBC Type of the SQL parameter to set
-     * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter
-     */
-    private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException {
-        if (parameterValue == null) {
-            stmt.setNull(parameterIndex, jdbcType);
-        } else {
-            switch (jdbcType) {
-                case Types.BIT:
-                    stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue));
-                    break;
-                case Types.TINYINT:
-                    stmt.setByte(parameterIndex, Byte.parseByte(parameterValue));
-                    break;
-                case Types.SMALLINT:
-                    stmt.setShort(parameterIndex, Short.parseShort(parameterValue));
-                    break;
-                case Types.INTEGER:
-                    stmt.setInt(parameterIndex, Integer.parseInt(parameterValue));
-                    break;
-                case Types.BIGINT:
-                    stmt.setLong(parameterIndex, Long.parseLong(parameterValue));
-                    break;
-                case Types.REAL:
-                    stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue));
-                    break;
-                case Types.FLOAT:
-                case Types.DOUBLE:
-                    stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue));
-                    break;
-                case Types.DATE:
-                    stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue)));
-                    break;
-                case Types.TIME:
-                    stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue)));
-                    break;
-                case Types.TIMESTAMP:
-                    stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue)));
-                    break;
-                case Types.CHAR:
-                case Types.VARCHAR:
-                case Types.LONGNVARCHAR:
-                case Types.LONGVARCHAR:
-                    stmt.setString(parameterIndex, parameterValue);
-                    break;
-                default:
-                    throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue
-                        + "' and a type of '" + jdbcType + "' but this is not a known data type");
-            }
-        }
-    }
-
-
-    /**
-     * A FlowFileFilter that is responsible for ensuring that the FlowFiles returned either belong
-     * to the same "fragmented transaction" (i.e., 1 transaction whose information is fragmented
-     * across multiple FlowFiles) or that none of the FlowFiles belongs to a fragmented transaction
-     */
-    static class TransactionalFlowFileFilter implements FlowFileFilter {
-        private String selectedId = null;
-        private int numSelected = 0;
-        private boolean ignoreFragmentIdentifiers = false;
-
-        public boolean isFragmentedTransaction() {
-            return !ignoreFragmentIdentifiers;
-        }
-
-        @Override
-        public FlowFileFilterResult filter(final FlowFile flowFile) {
-            final String fragmentId = flowFile.getAttribute(FRAGMENT_ID_ATTR);
-            final String fragCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR);
-
-            // if first FlowFile selected is not part of a fragmented transaction, then
-            // we accept any FlowFile that is also not part of a fragmented transaction.
-            if (ignoreFragmentIdentifiers) {
-                if (fragmentId == null || "1".equals(fragCount)) {
-                    return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
-                } else {
-                    return FlowFileFilterResult.REJECT_AND_CONTINUE;
-                }
-            }
-
-            if (fragmentId == null || "1".equals(fragCount)) {
-                if (selectedId == null) {
-                    // Only one FlowFile in the transaction.
-                    ignoreFragmentIdentifiers = true;
-                    return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
-                } else {
-                    // we've already selected 1 FlowFile, and this one doesn't match.
-                    return FlowFileFilterResult.REJECT_AND_CONTINUE;
-                }
-            }
-
-            if (selectedId == null) {
-                // select this fragment id as the chosen one.
-                selectedId = fragmentId;
-                numSelected++;
-                return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
-            }
-
-            if (selectedId.equals(fragmentId)) {
-                // fragment id's match. Find out if we have all of the necessary fragments or not.
-                final int numFragments;
-                if (NUMBER_PATTERN.matcher(fragCount).matches()) {
-                    numFragments = Integer.parseInt(fragCount);
-                } else {
-                    numFragments = Integer.MAX_VALUE;
-                }
-
-                if (numSelected >= numFragments - 1) {
-                    // We have all of the fragments we need for this transaction.
-                    return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
-                } else {
-                    // We still need more fragments for this transaction, so accept this one and continue.
-                    numSelected++;
-                    return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
-                }
-            } else {
-                return FlowFileFilterResult.REJECT_AND_CONTINUE;
-            }
-        }
-    }
-
-
-    /**
-     * A simple, immutable data structure to hold a List of FlowFiles and an indicator as to whether
-     * or not those FlowFiles represent a "fragmented transaction" - that is, a collection of FlowFiles
-     * that all must be executed as a single transaction (we refer to it as a fragment transaction
-     * because the information for that transaction, including SQL and the parameters, is fragmented
-     * across multiple FlowFiles).
-     */
-    private static class FlowFilePoll {
-        private final List<FlowFile> flowFiles;
-        private final boolean fragmentedTransaction;
-
-        public FlowFilePoll(final List<FlowFile> flowFiles, final boolean fragmentedTransaction) {
-            this.flowFiles = flowFiles;
-            this.fragmentedTransaction = fragmentedTransaction;
-        }
-
-        public List<FlowFile> getFlowFiles() {
-            return flowFiles;
-        }
-
-        public boolean isFragmentedTransaction() {
-            return fragmentedTransaction;
-        }
-    }
-
-
-    /**
-     * A simple, immutable data structure to hold a Prepared Statement and a List of FlowFiles
-     * for which that statement should be evaluated.
-     */
-    private static class StatementFlowFileEnclosure {
-        private final PreparedStatement statement;
-        private final List<FlowFile> flowFiles = new ArrayList<>();
-
-        public StatementFlowFileEnclosure(final PreparedStatement statement) {
-            this.statement = statement;
-        }
-
-        public PreparedStatement getStatement() {
-            return statement;
-        }
-
-        public List<FlowFile> getFlowFiles() {
-            return flowFiles;
-        }
-
-        public void addFlowFile(final FlowFile flowFile) {
-            this.flowFiles.add(flowFile);
-        }
-
-        @Override
-        public int hashCode() {
-            return statement.hashCode();
-        }
-
-        @Override
-        public boolean equals(final Object obj) {
-            if (obj == null) {
-                return false;
-            }
-            if (obj == this) {
-                return false;
-            }
-            if (!(obj instanceof StatementFlowFileEnclosure)) {
-                return false;
-            }
-
-            final StatementFlowFileEnclosure other = (StatementFlowFileEnclosure) obj;
-            return statement.equals(other.getStatement());
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.BatchUpdateException;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+@SupportsBatching
+@SeeAlso(ConvertJSONToSQL.class)
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
+@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command "
+    + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+    + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+    + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
+@ReadsAttributes({
+    @ReadsAttribute(attribute = "fragment.identifier", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or "
+        + "not two FlowFiles belong to the same transaction."),
+    @ReadsAttribute(attribute = "fragment.count", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles "
+        + "are needed to complete the transaction."),
+    @ReadsAttribute(attribute = "fragment.index", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles "
+        + "in a transaction should be evaluated."),
+    @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer "
+        + "that represents the JDBC Type of the parameter."),
+    @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as "
+        + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.")
+})
+@WritesAttributes({
+    @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
+        + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.")
+})
+public class PutSQL extends AbstractProcessor {
+
+    static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
+        .name("JDBC Connection Pool")
+        .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+            + "The Connection Pool is necessary in order to determine the appropriate database column types.")
+        .identifiesControllerService(DBCPService.class)
+        .required(true)
+        .build();
+    static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder()
+        .name("Support Fragmented Transactions")
+        .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. "
+            + "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; "
+            + "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. "
+            + "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.")
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .build();
+    static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder()
+        .name("Transaction Timeout")
+        .description("If the <Support Fragmented Transactions> property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute "
+            + "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship")
+        .required(false)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .build();
+    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+        .name("Batch Size")
+        .description("The preferred number of FlowFiles to put to the database in a single transaction")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("100")
+        .build();
+    static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder()
+        .name("Obtain Generated Keys")
+        .description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. "
+            + "This may result in slightly slower performance and is not supported by all databases.")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("A FlowFile is routed to this relationship after the database is successfully updated")
+        .build();
+    static final Relationship REL_RETRY = new Relationship.Builder()
+        .name("retry")
+        .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
+        .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, "
+            + "such as an invalid query or an integrity constraint violation")
+        .build();
+
+    private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
+    private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
+
+    private static final String FRAGMENT_ID_ATTR = "fragment.identifier";
+    private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
+    private static final String FRAGMENT_COUNT_ATTR = "fragment.count";
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(CONNECTION_POOL);
+        properties.add(SUPPORT_TRANSACTIONS);
+        properties.add(TRANSACTION_TIMEOUT);
+        properties.add(BATCH_SIZE);
+        properties.add(OBTAIN_GENERATED_KEYS);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_RETRY);
+        rels.add(REL_FAILURE);
+        return rels;
+    }
+
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFilePoll poll = pollFlowFiles(context, session);
+        if (poll == null) {
+            return;
+        }
+
+        final List<FlowFile> flowFiles = poll.getFlowFiles();
+        if (flowFiles == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+        final boolean obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean();
+        final Map<String, StatementFlowFileEnclosure> statementMap = new HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles
+        final List<FlowFile> sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent
+        final List<FlowFile> processedFlowFiles = new ArrayList<>(); // all flowfiles that we have processed
+        final Set<StatementFlowFileEnclosure> enclosuresToExecute = new LinkedHashSet<>(); // the enclosures that we've processed
+
+        // Because we can have a transaction that is necessary across multiple FlowFiles, things get complicated when
+        // some FlowFiles have been transferred to a relationship and then there is a failure. As a result, we will just
+        // map all FlowFiles to their destination relationship and do the session.transfer at the end. This way, if there
+        // is a failure, we can route all FlowFiles to failure if we need to.
+        final Map<FlowFile, Relationship> destinationRelationships = new HashMap<>();
+
+        final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
+        try (final Connection conn = dbcpService.getConnection()) {
+            final boolean originalAutoCommit = conn.getAutoCommit();
+            try {
+                conn.setAutoCommit(false);
+
+                for (final FlowFile flowFile : flowFiles) {
+                    processedFlowFiles.add(flowFile);
+                    final String sql = getSQL(session, flowFile);
+
+                    // Get the appropriate PreparedStatement to use.
+                    final StatementFlowFileEnclosure enclosure;
+                    try {
+                        enclosure = getEnclosure(sql, conn, statementMap, obtainKeys, poll.isFragmentedTransaction());
+                    } catch (final SQLNonTransientException e) {
+                        getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e});
+                        destinationRelationships.put(flowFile, REL_FAILURE);
+                        continue;
+                    }
+
+                    final PreparedStatement stmt = enclosure.getStatement();
+
+                    // set the appropriate parameters on the statement.
+                    try {
+                        setParameters(stmt, flowFile.getAttributes());
+                    } catch (final SQLException | ProcessException pe) {
+                        getLogger().error("Cannot update database for {} due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
+                        destinationRelationships.put(flowFile, REL_FAILURE);
+                        continue;
+                    }
+
+                    // If we need to obtain keys, we cannot do so in a a Batch Update. So we have to execute the statement and close it.
+                    if (obtainKeys) {
+                        try {
+                            // Execute the actual update.
+                            stmt.executeUpdate();
+
+                            // attempt to determine the key that was generated, if any. This is not supported by all
+                            // database vendors, so if we cannot determine the generated key (or if the statement is not an INSERT),
+                            // we will just move on without setting the attribute.
+                            FlowFile sentFlowFile = flowFile;
+                            final String generatedKey = determineGeneratedKey(stmt);
+                            if (generatedKey != null) {
+                                sentFlowFile = session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey);
+                            }
+
+                            stmt.close();
+                            sentFlowFiles.add(sentFlowFile);
+                        } catch (final SQLNonTransientException e) {
+                            getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e});
+                            destinationRelationships.put(flowFile, REL_FAILURE);
+                            continue;
+                        } catch (final SQLException e) {
+                            getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {flowFile, e});
+                            destinationRelationships.put(flowFile, REL_RETRY);
+                            continue;
+                        }
+                    } else {
+                        // We don't need to obtain keys. Just add the statement to the batch.
+                        stmt.addBatch();
+                        enclosure.addFlowFile(flowFile);
+                        enclosuresToExecute.add(enclosure);
+                    }
+                }
+
+                // If we are not trying to obtain the generated keys, we will have
+                // PreparedStatement's that have batches added to them. We need to execute each batch and close
+                // the PreparedStatement.
+                for (final StatementFlowFileEnclosure enclosure : enclosuresToExecute) {
+                    try {
+                        final PreparedStatement stmt = enclosure.getStatement();
+                        stmt.executeBatch();
+                        sentFlowFiles.addAll(enclosure.getFlowFiles());
+                    } catch (final BatchUpdateException e) {
+                        // If we get a BatchUpdateException, then we want to determine which FlowFile caused the failure,
+                        // and route that FlowFile to failure while routing those that finished processing to success and those
+                        // that have not yet been executed to retry. If the FlowFile was
+                        // part of a fragmented transaction, then we must roll back all updates for this connection, because
+                        // other statements may have been successful and been part of this transaction.
+                        final int[] updateCounts = e.getUpdateCounts();
+                        final int offendingFlowFileIndex = updateCounts.length;
+                        final List<FlowFile> batchFlowFiles = enclosure.getFlowFiles();
+
+                        if (poll.isFragmentedTransaction()) {
+                            // There are potentially multiple statements for this one transaction. As a result,
+                            // we need to roll back the entire transaction and route all of the FlowFiles to failure.
+                            conn.rollback();
+                            final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex);
+                            getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. "
+                                + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e});
+                            session.transfer(flowFiles, REL_FAILURE);
+                            return;
+                        }
+
+                        // In the presence of a BatchUpdateException, the driver has the option of either stopping when an error
+                        // occurs, or continuing. If it continues, then it must account for all statements in the batch and for
+                        // those that fail return a Statement.EXECUTE_FAILED for the number of rows updated.
+                        // So we will iterate over all of the update counts returned. If any is equal to Statement.EXECUTE_FAILED,
+                        // we will route the corresponding FlowFile to failure. Otherwise, the FlowFile will go to success
+                        // unless it has not yet been processed (its index in the List > updateCounts.length).
+                        int failureCount = 0;
+                        int successCount = 0;
+                        int retryCount = 0;
+                        for (int i = 0; i < updateCounts.length; i++) {
+                            final int updateCount = updateCounts[i];
+                            final FlowFile flowFile = batchFlowFiles.get(i);
+                            if (updateCount == Statement.EXECUTE_FAILED) {
+                                destinationRelationships.put(flowFile, REL_FAILURE);
+                                failureCount++;
+                            } else {
+                                destinationRelationships.put(flowFile, REL_SUCCESS);
+                                successCount++;
+                            }
+                        }
+
+                        if (failureCount == 0) {
+                            // if no failures found, the driver decided not to execute the statements after the
+                            // failure, so route the last one to failure.
+                            final FlowFile failedFlowFile = batchFlowFiles.get(updateCounts.length);
+                            destinationRelationships.put(failedFlowFile, REL_FAILURE);
+                            failureCount++;
+                        }
+
+                        if (updateCounts.length < batchFlowFiles.size()) {
+                            final List<FlowFile> unexecuted = batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size());
+                            for (final FlowFile flowFile : unexecuted) {
+                                destinationRelationships.put(flowFile, REL_RETRY);
+                                retryCount++;
+                            }
+                        }
+
+                        getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, "
+                            + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount});
+                    } catch (final SQLNonTransientException e) {
+                        getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e});
+
+                        for (final FlowFile flowFile : enclosure.getFlowFiles()) {
+                            destinationRelationships.put(flowFile, REL_FAILURE);
+                        }
+                        continue;
+                    } catch (final SQLException e) {
+                        getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
+                            new Object[] {enclosure.getFlowFiles(), e});
+
+                        for (final FlowFile flowFile : enclosure.getFlowFiles()) {
+                            destinationRelationships.put(flowFile, REL_RETRY);
+                        }
+                        continue;
+                    } finally {
+                        enclosure.getStatement().close();
+                    }
+                }
+            } finally {
+                try {
+                    conn.commit();
+                } finally {
+                    // make sure that we try to set the auto commit back to whatever it was.
+                    if (originalAutoCommit) {
+                        try {
+                            conn.setAutoCommit(originalAutoCommit);
+                        } catch (final SQLException se) {
+                        }
+                    }
+                }
+            }
+
+            // Determine the database URL
+            String url = "jdbc://unknown-host";
+            try {
+                url = conn.getMetaData().getURL();
+            } catch (final SQLException sqle) {
+            }
+
+            // Emit a Provenance SEND event
+            final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            for (final FlowFile flowFile : sentFlowFiles) {
+                session.getProvenanceReporter().send(flowFile, url, transmissionMillis, true);
+            }
+
+            for (final FlowFile flowFile : sentFlowFiles) {
+                destinationRelationships.put(flowFile, REL_SUCCESS);
+            }
+        } catch (final SQLException e) {
+            // Failed FlowFiles are all of them that we have processed minus those that were successfully sent
+            final List<FlowFile> failedFlowFiles = processedFlowFiles;
+            failedFlowFiles.removeAll(sentFlowFiles);
+
+            // All FlowFiles yet to be processed is all FlowFiles minus those processed
+            final List<FlowFile> retry = flowFiles;
+            retry.removeAll(processedFlowFiles);
+
+            final Relationship rel;
+            if (e instanceof SQLNonTransientException) {
+                getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {failedFlowFiles, e});
+                rel = REL_FAILURE;
+            } else {
+                getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {failedFlowFiles, e});
+                rel = REL_RETRY;
+            }
+
+            for (final FlowFile flowFile : failedFlowFiles) {
+                destinationRelationships.put(flowFile, rel);
+            }
+
+            for (final FlowFile flowFile : retry) {
+                destinationRelationships.put(flowFile, Relationship.SELF);
+            }
+        }
+
+        for (final Map.Entry<FlowFile, Relationship> entry : destinationRelationships.entrySet()) {
+            session.transfer(entry.getKey(), entry.getValue());
+        }
+    }
+
+
+    /**
+     * Pulls a batch of FlowFiles from the incoming queues. If no FlowFiles are available, returns <code>null</code>.
+     * Otherwise, a List of FlowFiles will be returned.
+     *
+     * If all FlowFiles pulled are not eligible to be processed, the FlowFiles will be penalized and transferred back
+     * to the input queue and an empty List will be returned.
+     *
+     * Otherwise, if the Support Fragmented Transactions property is true, all FlowFiles that belong to the same
+     * transaction will be sorted in the order that they should be evaluated.
+     *
+     * @param context the process context for determining properties
+     * @param session the process session for pulling flowfiles
+     * @return a FlowFilePoll containing a List of FlowFiles to process, or <code>null</code> if there are no FlowFiles to process
+     */
+    private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSession session) {
+        // Determine which FlowFile Filter to use in order to obtain FlowFiles.
+        final boolean useTransactions = context.getProperty(SUPPORT_TRANSACTIONS).asBoolean();
+        boolean fragmentedTransaction = false;
+
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        List<FlowFile> flowFiles;
+        if (useTransactions) {
+            final TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter();
+            flowFiles = session.get(filter);
+            fragmentedTransaction = filter.isFragmentedTransaction();
+        } else {
+            flowFiles = session.get(batchSize);
+        }
+
+        if (flowFiles.isEmpty()) {
+            return null;
+        }
+
+        // If we are supporting fragmented transactions, verify that all FlowFiles are correct
+        if (fragmentedTransaction) {
+            final Relationship relationship = determineRelationship(flowFiles, context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS));
+            if (relationship != null) {
+                // if transferring back to self, penalize the FlowFiles.
+                if (relationship == Relationship.SELF) {
+                    // penalize all of the FlowFiles that we are going to route to SELF.
+                    final ListIterator<FlowFile> itr = flowFiles.listIterator();
+                    while (itr.hasNext()) {
+                        final FlowFile flowFile = itr.next();
+                        final FlowFile penalized = session.penalize(flowFile);
+                        itr.remove();
+                        itr.add(penalized);
+                    }
+                }
+
+                session.transfer(flowFiles, relationship);
+                return null;
+            }
+
+            // sort by fragment index.
+            Collections.sort(flowFiles, new Comparator<FlowFile>() {
+                @Override
+                public int compare(final FlowFile o1, final FlowFile o2) {
+                    return Integer.compare(Integer.parseInt(o1.getAttribute(FRAGMENT_INDEX_ATTR)), Integer.parseInt(o2.getAttribute(FRAGMENT_INDEX_ATTR)));
+                }
+            });
+        }
+
+        return new FlowFilePoll(flowFiles, fragmentedTransaction);
+    }
+
+
+    /**
+     * Returns the key that was generated from the given statement, or <code>null</code> if no key
+     * was generated or it could not be determined.
+     *
+     * @param stmt the statement that generated a key
+     * @return the key that was generated from the given statement, or <code>null</code> if no key
+     *         was generated or it could not be determined.
+     */
+    private String determineGeneratedKey(final PreparedStatement stmt) {
+        try {
+            final ResultSet generatedKeys = stmt.getGeneratedKeys();
+            if (generatedKeys != null && generatedKeys.next()) {
+                return generatedKeys.getString(1);
+            }
+        } catch (final SQLException sqle) {
+            // This is not supported by all vendors. This is a best-effort approach.
+        }
+
+        return null;
+    }
+
+
+    /**
+     * Returns the StatementFlowFileEnclosure that should be used for executing the given SQL statement
+     *
+     * @param sql the SQL to execute
+     * @param conn the connection from which a PreparedStatement can be created
+     * @param stmtMap the existing map of SQL to PreparedStatements
+     * @param obtainKeys whether or not we need to obtain generated keys for INSERT statements
+     * @param fragmentedTransaction whether or not the SQL pertains to a fragmented transaction
+     *
+     * @return a StatementFlowFileEnclosure to use for executing the given SQL statement
+     *
+     * @throws SQLException if unable to create the appropriate PreparedStatement
+     */
+    private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap,
+        final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException {
+        StatementFlowFileEnclosure enclosure = stmtMap.get(sql);
+        if (enclosure != null) {
+            return enclosure;
+        }
+
+        if (obtainKeys) {
+            // Create a new Prepared Statement, requesting that it return the generated keys.
+            PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
+
+            if (stmt == null) {
+                // since we are passing Statement.RETURN_GENERATED_KEYS, calls to conn.prepareStatement will
+                // in some cases (at least for DerbyDB) return null.
+                // We will attempt to recompile the statement without the generated keys being returned.
+                stmt = conn.prepareStatement(sql);
+            }
+
+            // If we need to obtain keys, then we cannot do a Batch Update. In this case,
+            // we don't need to store the PreparedStatement in the Map because we aren't
+            // doing an addBatch/executeBatch. Instead, we will use the statement once
+            // and close it.
+            return new StatementFlowFileEnclosure(stmt);
+        } else if (fragmentedTransaction) {
+            // We cannot use Batch Updates if we have a transaction that spans multiple FlowFiles.
+            // If we did, we could end up processing the statements out of order. It's quite possible
+            // that we could refactor the code some to allow for this, but as it is right now, this
+            // could cause problems. This is because we have a Map<String, StatementFlowFileEnclosure>.
+            // If we had a transaction that needed to execute Stmt A with some parameters, then Stmt B with
+            // some parameters, then Stmt A with different parameters, this would become problematic because
+            // the executeUpdate would be evaluated first for Stmt A (the 1st and 3rd statements, and then
+            // the second statement would be evaluated).
+            final PreparedStatement stmt = conn.prepareStatement(sql);
+            return new StatementFlowFileEnclosure(stmt);
+        }
+
+        final PreparedStatement stmt = conn.prepareStatement(sql);
+        enclosure = new StatementFlowFileEnclosure(stmt);
+        stmtMap.put(sql, enclosure);
+        return enclosure;
+    }
+
+
+    /**
+     * Determines the SQL statement that should be executed for the given FlowFile
+     *
+     * @param session the session that can be used to access the given FlowFile
+     * @param flowFile the FlowFile whose SQL statement should be executed
+     *
+     * @return the SQL that is associated with the given FlowFile
+     */
+    private String getSQL(final ProcessSession session, final FlowFile flowFile) {
+        // Read the SQL from the FlowFile's content
+        final byte[] buffer = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, buffer);
+            }
+        });
+
+        // Create the PreparedStatement to use for this FlowFile.
+        final String sql = new String(buffer, StandardCharsets.UTF_8);
+        return sql;
+    }
+
+
+    /**
+     * Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes.
+     *
+     * @param stmt the statement to set the parameters on
+     * @param attributes the attributes from which to derive parameter indices, values, and types
+     * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called
+     */
+    private void setParameters(final PreparedStatement stmt, final Map<String, String> attributes) throws SQLException {
+        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+            final String key = entry.getKey();
+            final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
+            if (matcher.matches()) {
+                final int parameterIndex = Integer.parseInt(matcher.group(1));
+
+                final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
+                if (!isNumeric) {
+                    throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type");
+                }
+
+                final int jdbcType = Integer.parseInt(entry.getValue());
+                final String valueAttrName = "sql.args." + parameterIndex + ".value";
+                final String parameterValue = attributes.get(valueAttrName);
+
+                try {
+                    setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType);
+                } catch (final NumberFormatException nfe) {
+                    throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe);
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Determines which relationship the given FlowFiles should go to, based on a transaction timing out or
+     * transaction information not being present. If the FlowFiles should be processed and not transferred
+     * to any particular relationship yet, will return <code>null</code>
+     *
+     * @param flowFiles the FlowFiles whose relationship is to be determined
+     * @param transactionTimeoutMillis the maximum amount of time (in milliseconds) that we should wait
+     *            for all FlowFiles in a transaction to be present before routing to failure
+     * @return the appropriate relationship to route the FlowFiles to, or <code>null</code> if the FlowFiles
+     *         should instead be processed
+     */
+    Relationship determineRelationship(final List<FlowFile> flowFiles, final Long transactionTimeoutMillis) {
+        int selectedNumFragments = 0;
+        final BitSet bitSet = new BitSet();
+
+        for (final FlowFile flowFile : flowFiles) {
+            final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR);
+            if (fragmentCount == null && flowFiles.size() == 1) {
+                return null;
+            } else if (fragmentCount == null) {
+                getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier "
+                    + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()});
+                return REL_FAILURE;
+            }
+
+            final int numFragments;
+            try {
+                numFragments = Integer.parseInt(fragmentCount);
+            } catch (final NumberFormatException nfe) {
+                getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; "
+                    + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
+                return REL_FAILURE;
+            }
+
+            if (numFragments < 1) {
+                getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; "
+                    + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
+                return REL_FAILURE;
+            }
+
+            if (selectedNumFragments == 0) {
+                selectedNumFragments = numFragments;
+            } else if (numFragments != selectedNumFragments) {
+                getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; "
+                    + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
+                return REL_FAILURE;
+            }
+
+            final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR);
+            if (fragmentIndex == null) {
+                getLogger().error("Cannot process {} because the fragment.index attribute is missing; "
+                    + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
+                return REL_FAILURE;
+            }
+
+            final int idx;
+            try {
+                idx = Integer.parseInt(fragmentIndex);
+            } catch (final NumberFormatException nfe) {
+                getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; "
+                    + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flow

<TRUNCATED>