You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/27 17:56:55 UTC
[1/5] nifi git commit: NIFI-3415: Add Rollback on Failure.
Repository: nifi
Updated Branches:
refs/heads/master a1bffbcc8 -> d9acdb54b
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 6d7c504..cb3b198 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -26,18 +26,28 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
-import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.ErrorTypes;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler;
+import org.apache.nifi.processor.util.pattern.PartialFunctions;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.FetchFlowFiles;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.FlowFileGroup;
+import org.apache.nifi.processor.util.pattern.PutGroup;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processor.util.pattern.RoutingResult;
import org.apache.nifi.stream.io.StreamUtils;
import javax.xml.bind.DatatypeConverter;
@@ -52,6 +62,7 @@ import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.sql.Statement;
@@ -65,19 +76,19 @@ import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.BitSet;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.List;
-import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError;
+
@SupportsBatching
@SeeAlso(ConvertJSONToSQL.class)
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -109,7 +120,7 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
+ "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.")
})
-public class PutSQL extends AbstractProcessor {
+public class PutSQL extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
.name("JDBC Connection Pool")
@@ -180,6 +191,7 @@ public class PutSQL extends AbstractProcessor {
properties.add(TRANSACTION_TIMEOUT);
properties.add(BATCH_SIZE);
properties.add(OBTAIN_GENERATED_KEYS);
+ properties.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
return properties;
}
@@ -192,199 +204,176 @@ public class PutSQL extends AbstractProcessor {
return rels;
}
+ private static class FunctionContext extends RollbackOnFailure {
+ private boolean obtainKeys = false;
+ private boolean fragmentedTransaction = false;
+ private boolean originalAutoCommit = false;
+ private final long startNanos = System.nanoTime();
+ private FunctionContext(boolean rollbackOnFailure) {
+ super(rollbackOnFailure, true);
+ }
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final FlowFilePoll poll = pollFlowFiles(context, session);
+ private boolean isSupportBatching() {
+ return !obtainKeys && !fragmentedTransaction;
+ }
+ }
+
+ private PutGroup<FunctionContext, Connection, StatementFlowFileEnclosure> process;
+ private BiFunction<FunctionContext, ErrorTypes, ErrorTypes.Result> adjustError;
+ private ExceptionHandler<FunctionContext> exceptionHandler;
+
+
+ private final FetchFlowFiles<FunctionContext> fetchFlowFiles = (c, s, fc, r) -> {
+ final FlowFilePoll poll = pollFlowFiles(c, s, fc, r);
if (poll == null) {
- return;
+ return null;
}
+ fc.fragmentedTransaction = poll.isFragmentedTransaction();
+ return poll.getFlowFiles();
+ };
- final List<FlowFile> flowFiles = poll.getFlowFiles();
- if (flowFiles == null) {
- return;
+ private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc) -> {
+ final Connection connection = c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class).getConnection();
+ try {
+ fc.originalAutoCommit = connection.getAutoCommit();
+ connection.setAutoCommit(false);
+ } catch (SQLException e) {
+ throw new ProcessException("Failed to disable auto commit due to " + e, e);
}
+ return connection;
+ };
- final long startNanos = System.nanoTime();
- final boolean obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean();
- final Map<String, StatementFlowFileEnclosure> statementMap = new HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles
- final List<FlowFile> sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent
- final List<FlowFile> processedFlowFiles = new ArrayList<>(); // all flowfiles that we have processed
- final Set<StatementFlowFileEnclosure> enclosuresToExecute = new LinkedHashSet<>(); // the enclosures that we've processed
- // Because we can have a transaction that is necessary across multiple FlowFiles, things get complicated when
- // some FlowFiles have been transferred to a relationship and then there is a failure. As a result, we will just
- // map all FlowFiles to their destination relationship and do the session.transfer at the end. This way, if there
- // is a failure, we can route all FlowFiles to failure if we need to.
- final Map<FlowFile, Relationship> destinationRelationships = new HashMap<>();
+ @FunctionalInterface
+ private interface GroupingFunction {
+ void apply(final ProcessContext context, final ProcessSession session, final FunctionContext fc,
+ final Connection conn, final List<FlowFile> flowFiles,
+ final List<StatementFlowFileEnclosure> groups,
+ final Map<String, StatementFlowFileEnclosure> sqlToEnclosure,
+ final RoutingResult result);
+ }
- final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
- try (final Connection conn = dbcpService.getConnection()) {
- final boolean originalAutoCommit = conn.getAutoCommit();
- try {
- conn.setAutoCommit(false);
-
- for (final FlowFile flowFile : flowFiles) {
- processedFlowFiles.add(flowFile);
- final String sql = getSQL(session, flowFile);
-
- // Get the appropriate PreparedStatement to use.
- final StatementFlowFileEnclosure enclosure;
- try {
- enclosure = getEnclosure(sql, conn, statementMap, obtainKeys, poll.isFragmentedTransaction());
- } catch (final SQLNonTransientException e) {
- getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e});
- destinationRelationships.put(flowFile, REL_FAILURE);
- continue;
- }
+ private GroupingFunction groupFragmentedTransaction = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> {
+ final FragmentedEnclosure fragmentedEnclosure = new FragmentedEnclosure();
+ groups.add(fragmentedEnclosure);
- final PreparedStatement stmt = enclosure.getStatement();
+ for (final FlowFile flowFile : flowFiles) {
+ final String sql = getSQL(session, flowFile);
- // set the appropriate parameters on the statement.
- try {
- setParameters(stmt, flowFile.getAttributes());
- } catch (final SQLException | ProcessException pe) {
- getLogger().error("Cannot update database for {} due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe);
- destinationRelationships.put(flowFile, REL_FAILURE);
- continue;
- }
+ final StatementFlowFileEnclosure enclosure = sqlToEnclosure
+ .computeIfAbsent(sql, k -> new StatementFlowFileEnclosure(sql));
- // If we need to obtain keys, we cannot do so in a a Batch Update. So we have to execute the statement and close it.
- if (obtainKeys) {
- try {
- // Execute the actual update.
- stmt.executeUpdate();
-
- // attempt to determine the key that was generated, if any. This is not supported by all
- // database vendors, so if we cannot determine the generated key (or if the statement is not an INSERT),
- // we will just move on without setting the attribute.
- FlowFile sentFlowFile = flowFile;
- final String generatedKey = determineGeneratedKey(stmt);
- if (generatedKey != null) {
- sentFlowFile = session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey);
- }
-
- stmt.close();
- sentFlowFiles.add(sentFlowFile);
- } catch (final SQLNonTransientException e) {
- getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e});
- destinationRelationships.put(flowFile, REL_FAILURE);
- continue;
- } catch (final SQLException e) {
- getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {flowFile, e});
- destinationRelationships.put(flowFile, REL_RETRY);
- continue;
- }
- } else {
- // We don't need to obtain keys. Just add the statement to the batch.
- stmt.addBatch();
- enclosure.addFlowFile(flowFile);
- enclosuresToExecute.add(enclosure);
- }
- }
+ fragmentedEnclosure.addFlowFile(flowFile, enclosure);
+ }
+ };
- // If we are not trying to obtain the generated keys, we will have
- // PreparedStatement's that have batches added to them. We need to execute each batch and close
- // the PreparedStatement.
- for (final StatementFlowFileEnclosure enclosure : enclosuresToExecute) {
- try {
- final PreparedStatement stmt = enclosure.getStatement();
- stmt.executeBatch();
- sentFlowFiles.addAll(enclosure.getFlowFiles());
- } catch (final BatchUpdateException e) {
- // If we get a BatchUpdateException, then we want to determine which FlowFile caused the failure,
- // and route that FlowFile to failure while routing those that finished processing to success and those
- // that have not yet been executed to retry. If the FlowFile was
- // part of a fragmented transaction, then we must roll back all updates for this connection, because
- // other statements may have been successful and been part of this transaction.
- final int[] updateCounts = e.getUpdateCounts();
- final int offendingFlowFileIndex = updateCounts.length;
- final List<FlowFile> batchFlowFiles = enclosure.getFlowFiles();
-
- if (poll.isFragmentedTransaction()) {
- // There are potentially multiple statements for this one transaction. As a result,
- // we need to roll back the entire transaction and route all of the FlowFiles to failure.
- conn.rollback();
- final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex);
- getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. "
- + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e});
- session.transfer(flowFiles, REL_FAILURE);
- return;
- }
+ private final GroupingFunction groupFlowFilesBySQLBatch = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> {
+ for (final FlowFile flowFile : flowFiles) {
+ final String sql = getSQL(session, flowFile);
+
+ // Get or create the appropriate PreparedStatement to use.
+ final StatementFlowFileEnclosure enclosure = sqlToEnclosure
+ .computeIfAbsent(sql, k -> {
+ final StatementFlowFileEnclosure newEnclosure = new StatementFlowFileEnclosure(sql);
+ groups.add(newEnclosure);
+ return newEnclosure;
+ });
+
+ if(!exceptionHandler.execute(fc, flowFile, input -> {
+ final PreparedStatement stmt = enclosure.getCachedStatement(conn);
+ setParameters(stmt, flowFile.getAttributes());
+ stmt.addBatch();
+ }, onFlowFileError(context, session, result))) {
+ continue;
+ }
- // In the presence of a BatchUpdateException, the driver has the option of either stopping when an error
- // occurs, or continuing. If it continues, then it must account for all statements in the batch and for
- // those that fail return a Statement.EXECUTE_FAILED for the number of rows updated.
- // So we will iterate over all of the update counts returned. If any is equal to Statement.EXECUTE_FAILED,
- // we will route the corresponding FlowFile to failure. Otherwise, the FlowFile will go to success
- // unless it has not yet been processed (its index in the List > updateCounts.length).
- int failureCount = 0;
- int successCount = 0;
- int retryCount = 0;
- for (int i = 0; i < updateCounts.length; i++) {
- final int updateCount = updateCounts[i];
- final FlowFile flowFile = batchFlowFiles.get(i);
- if (updateCount == Statement.EXECUTE_FAILED) {
- destinationRelationships.put(flowFile, REL_FAILURE);
- failureCount++;
- } else {
- destinationRelationships.put(flowFile, REL_SUCCESS);
- successCount++;
- }
- }
+ enclosure.addFlowFile(flowFile);
+ }
+ };
- if (failureCount == 0) {
- // if no failures found, the driver decided not to execute the statements after the
- // failure, so route the last one to failure.
- final FlowFile failedFlowFile = batchFlowFiles.get(updateCounts.length);
- destinationRelationships.put(failedFlowFile, REL_FAILURE);
- failureCount++;
- }
+ private GroupingFunction groupFlowFilesBySQL = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> {
+ for (final FlowFile flowFile : flowFiles) {
+ final String sql = getSQL(session, flowFile);
- if (updateCounts.length < batchFlowFiles.size()) {
- final List<FlowFile> unexecuted = batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size());
- for (final FlowFile flowFile : unexecuted) {
- destinationRelationships.put(flowFile, REL_RETRY);
- retryCount++;
- }
- }
+ // Get or create the appropriate PreparedStatement to use.
+ final StatementFlowFileEnclosure enclosure = sqlToEnclosure
+ .computeIfAbsent(sql, k -> {
+ final StatementFlowFileEnclosure newEnclosure = new StatementFlowFileEnclosure(sql);
+ groups.add(newEnclosure);
+ return newEnclosure;
+ });
- getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, "
- + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount});
- } catch (final SQLNonTransientException e) {
- getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e});
+ enclosure.addFlowFile(flowFile);
+ }
+ };
+
+ final PutGroup.GroupFlowFiles<FunctionContext, Connection, StatementFlowFileEnclosure> groupFlowFiles = (context, session, fc, conn, flowFiles, result) -> {
+ final Map<String, StatementFlowFileEnclosure> sqlToEnclosure = new HashMap<>();
+ final List<StatementFlowFileEnclosure> groups = new ArrayList<>();
+
+ // There are three patterns:
+ // 1. Support batching: An enclosure has multiple FlowFiles being executed in a batch operation
+ // 2. Obtain keys: An enclosure has multiple FlowFiles, and each FlowFile is executed separately
+ // 3. Fragmented transaction: One FlowFile per Enclosure?
+ if (fc.obtainKeys) {
+ groupFlowFilesBySQL.apply(context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result);
+ } else if (fc.fragmentedTransaction) {
+ groupFragmentedTransaction.apply(context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result);
+ } else {
+ groupFlowFilesBySQLBatch.apply(context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result);
+ }
- for (final FlowFile flowFile : enclosure.getFlowFiles()) {
- destinationRelationships.put(flowFile, REL_FAILURE);
- }
- continue;
- } catch (final SQLException e) {
- getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
- new Object[] {enclosure.getFlowFiles(), e});
+ return groups;
+ };
- for (final FlowFile flowFile : enclosure.getFlowFiles()) {
- destinationRelationships.put(flowFile, REL_RETRY);
- }
- continue;
- } finally {
- enclosure.getStatement().close();
- }
+ final PutGroup.PutFlowFiles<FunctionContext, Connection, StatementFlowFileEnclosure> putFlowFiles = (context, session, fc, conn, enclosure, result) -> {
+
+ if (fc.isSupportBatching()) {
+
+ // We have PreparedStatement that have batches added to them.
+ // We need to execute each batch and close the PreparedStatement.
+ exceptionHandler.execute(fc, enclosure, input -> {
+ try (final PreparedStatement stmt = enclosure.getCachedStatement(conn)) {
+ stmt.executeBatch();
+ result.routeTo(enclosure.getFlowFiles(), REL_SUCCESS);
}
- } finally {
- try {
- conn.commit();
- } finally {
- // make sure that we try to set the auto commit back to whatever it was.
- if (originalAutoCommit) {
- try {
- conn.setAutoCommit(originalAutoCommit);
- } catch (final SQLException se) {
+ }, onBatchUpdateError(context, session, result));
+
+ } else {
+ for (final FlowFile flowFile : enclosure.getFlowFiles()) {
+
+ final StatementFlowFileEnclosure targetEnclosure
+ = enclosure instanceof FragmentedEnclosure
+ ? ((FragmentedEnclosure) enclosure).getTargetEnclosure(flowFile)
+ : enclosure;
+
+ // Execute update one by one.
+ exceptionHandler.execute(fc, flowFile, input -> {
+ try (final PreparedStatement stmt = targetEnclosure.getNewStatement(conn, fc.obtainKeys)) {
+
+ // set the appropriate parameters on the statement.
+ setParameters(stmt, flowFile.getAttributes());
+
+ stmt.executeUpdate();
+
+ // attempt to determine the key that was generated, if any. This is not supported by all
+ // database vendors, so if we cannot determine the generated key (or if the statement is not an INSERT),
+ // we will just move on without setting the attribute.
+ FlowFile sentFlowFile = flowFile;
+ final String generatedKey = determineGeneratedKey(stmt);
+ if (generatedKey != null) {
+ sentFlowFile = session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey);
}
+
+ result.routeTo(sentFlowFile, REL_SUCCESS);
+
}
- }
+ }, onFlowFileError(context, session, result));
}
+ }
+ if (result.contains(REL_SUCCESS)) {
// Determine the database URL
String url = "jdbc://unknown-host";
try {
@@ -393,46 +382,167 @@ public class PutSQL extends AbstractProcessor {
}
// Emit a Provenance SEND event
- final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- for (final FlowFile flowFile : sentFlowFiles) {
+ final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos);
+ for (final FlowFile flowFile : result.getRoutedFlowFiles().get(REL_SUCCESS)) {
session.getProvenanceReporter().send(flowFile, url, transmissionMillis, true);
}
+ }
+ };
+
+ private ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError(final ProcessContext context, final ProcessSession session, final RoutingResult result) {
+ ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError = createOnError(context, session, result, REL_FAILURE, REL_RETRY);
+ onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> {
+ switch (r.destination()) {
+ case Failure:
+ getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {i, e}, e);
+ break;
+ case Retry:
+ getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
+ new Object[] {i, e}, e);
+ break;
+ }
+ });
+ return RollbackOnFailure.createOnError(onFlowFileError);
+ }
+
+ private ExceptionHandler.OnError<FunctionContext, StatementFlowFileEnclosure> onBatchUpdateError(
+ final ProcessContext context, final ProcessSession session, final RoutingResult result) {
+ return RollbackOnFailure.createOnError((c, enclosure, r, e) -> {
+
+ // If rollbackOnFailure is enabled, the error will be thrown as ProcessException instead.
+ if (e instanceof BatchUpdateException && !c.isRollbackOnFailure()) {
+
+ // If we get a BatchUpdateException, then we want to determine which FlowFile caused the failure,
+ // and route that FlowFile to failure while routing those that finished processing to success and those
+ // that have not yet been executed to retry.
+ // Currently fragmented transaction does not use batch update.
+ final int[] updateCounts = ((BatchUpdateException) e).getUpdateCounts();
+ final List<FlowFile> batchFlowFiles = enclosure.getFlowFiles();
+
+ // In the presence of a BatchUpdateException, the driver has the option of either stopping when an error
+ // occurs, or continuing. If it continues, then it must account for all statements in the batch and for
+ // those that fail return a Statement.EXECUTE_FAILED for the number of rows updated.
+ // So we will iterate over all of the update counts returned. If any is equal to Statement.EXECUTE_FAILED,
+ // we will route the corresponding FlowFile to failure. Otherwise, the FlowFile will go to success
+ // unless it has not yet been processed (its index in the List > updateCounts.length).
+ int failureCount = 0;
+ int successCount = 0;
+ int retryCount = 0;
+ for (int i = 0; i < updateCounts.length; i++) {
+ final int updateCount = updateCounts[i];
+ final FlowFile flowFile = batchFlowFiles.get(i);
+ if (updateCount == Statement.EXECUTE_FAILED) {
+ result.routeTo(flowFile, REL_FAILURE);
+ failureCount++;
+ } else {
+ result.routeTo(flowFile, REL_SUCCESS);
+ successCount++;
+ }
+ }
+
+ if (failureCount == 0) {
+ // if no failures found, the driver decided not to execute the statements after the
+ // failure, so route the last one to failure.
+ final FlowFile failedFlowFile = batchFlowFiles.get(updateCounts.length);
+ result.routeTo(failedFlowFile, REL_FAILURE);
+ failureCount++;
+ }
+
+ if (updateCounts.length < batchFlowFiles.size()) {
+ final List<FlowFile> unexecuted = batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size());
+ for (final FlowFile flowFile : unexecuted) {
+ result.routeTo(flowFile, REL_RETRY);
+ retryCount++;
+ }
+ }
+
+ getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, "
+ + "and {} that were not execute and will be routed to retry; ", new Object[]{failureCount, successCount, retryCount});
+
+ return;
- for (final FlowFile flowFile : sentFlowFiles) {
- destinationRelationships.put(flowFile, REL_SUCCESS);
}
- } catch (final SQLException e) {
- // Failed FlowFiles are all of them that we have processed minus those that were successfully sent
- final List<FlowFile> failedFlowFiles = processedFlowFiles;
- failedFlowFiles.removeAll(sentFlowFiles);
- // All FlowFiles yet to be processed is all FlowFiles minus those processed
- final List<FlowFile> retry = flowFiles;
- retry.removeAll(processedFlowFiles);
+ // Apply default error handling and logging for other Exceptions.
+ ExceptionHandler.OnError<RollbackOnFailure, FlowFileGroup> onGroupError
+ = ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY);
+ onGroupError = onGroupError.andThen((cl, il, rl, el) -> {
+ switch (r.destination()) {
+ case Failure:
+ getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {il.getFlowFiles(), e}, e);
+ break;
+ case Retry:
+ getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
+ new Object[] {il.getFlowFiles(), e}, e);
+ break;
+ }
+ });
+ onGroupError.apply(c, enclosure, r, e);
+ });
+ }
+
+ @OnScheduled
+ public void constructProcess() {
+ process = new PutGroup<>();
- final Relationship rel;
- if (e instanceof SQLNonTransientException) {
- getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {failedFlowFiles, e});
- rel = REL_FAILURE;
- } else {
- getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {failedFlowFiles, e});
- rel = REL_RETRY;
+ process.setLogger(getLogger());
+ process.fetchFlowFiles(fetchFlowFiles);
+ process.initConnection(initConnection);
+ process.groupFetchedFlowFiles(groupFlowFiles);
+ process.putFlowFiles(putFlowFiles);
+ process.adjustRoute(RollbackOnFailure.createAdjustRoute(REL_FAILURE, REL_RETRY));
+
+ process.onCompleted((c, s, fc, conn) -> {
+ try {
+ conn.commit();
+ } catch (SQLException e) {
+ // Throw ProcessException to rollback process session.
+ throw new ProcessException("Failed to commit database connection due to " + e, e);
}
+ });
- for (final FlowFile flowFile : failedFlowFiles) {
- destinationRelationships.put(flowFile, rel);
+ process.onFailed((c, s, fc, conn, e) -> {
+ try {
+ conn.rollback();
+ } catch (SQLException re) {
+ // Just log the fact that rollback failed.
+ // ProcessSession will be rollback by the thrown Exception so don't have to do anything here.
+ getLogger().warn("Failed to rollback database connection due to %s", new Object[]{re}, re);
}
+ });
- for (final FlowFile flowFile : retry) {
- destinationRelationships.put(flowFile, Relationship.SELF);
+ process.cleanup((c, s, fc, conn) -> {
+ // make sure that we try to set the auto commit back to whatever it was.
+ if (fc.originalAutoCommit) {
+ try {
+ conn.setAutoCommit(true);
+ } catch (final SQLException se) {
+ getLogger().warn("Failed to reset autocommit due to {}", new Object[]{se});
+ }
}
- }
+ });
- for (final Map.Entry<FlowFile, Relationship> entry : destinationRelationships.entrySet()) {
- session.transfer(entry.getKey(), entry.getValue());
- }
+ exceptionHandler = new ExceptionHandler<>();
+ exceptionHandler.mapException(e -> {
+ if (e instanceof SQLNonTransientException) {
+ return ErrorTypes.InvalidInput;
+ } else if (e instanceof SQLException) {
+ return ErrorTypes.TemporalFailure;
+ } else {
+ return ErrorTypes.UnknownFailure;
+ }
+ });
+ adjustError = RollbackOnFailure.createAdjustError(getLogger());
+ exceptionHandler.adjustError(adjustError);
}
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+ final Boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
+ final FunctionContext functionContext = new FunctionContext(rollbackOnFailure);
+ functionContext.obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean();
+ RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, getLogger(), session -> process.onTrigger(context, session, functionContext));
+ }
/**
* Pulls a batch of FlowFiles from the incoming queues. If no FlowFiles are available, returns <code>null</code>.
@@ -448,7 +558,8 @@ public class PutSQL extends AbstractProcessor {
* @param session the process session for pulling flowfiles
* @return a FlowFilePoll containing a List of FlowFiles to process, or <code>null</code> if there are no FlowFiles to process
*/
- private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSession session) {
+ private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSession session,
+ final FunctionContext functionContext, final RoutingResult result) {
// Determine which FlowFile Filter to use in order to obtain FlowFiles.
final boolean useTransactions = context.getProperty(SUPPORT_TRANSACTIONS).asBoolean();
boolean fragmentedTransaction = false;
@@ -469,31 +580,23 @@ public class PutSQL extends AbstractProcessor {
// If we are supporting fragmented transactions, verify that all FlowFiles are correct
if (fragmentedTransaction) {
- final Relationship relationship = determineRelationship(flowFiles, context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS));
- if (relationship != null) {
- // if transferring back to self, penalize the FlowFiles.
- if (relationship == Relationship.SELF) {
- // penalize all of the FlowFiles that we are going to route to SELF.
- final ListIterator<FlowFile> itr = flowFiles.listIterator();
- while (itr.hasNext()) {
- final FlowFile flowFile = itr.next();
- final FlowFile penalized = session.penalize(flowFile);
- itr.remove();
- itr.add(penalized);
- }
+ try {
+ if (!isFragmentedTransactionReady(flowFiles, context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS))) {
+ // Not ready, penalize FlowFiles and put it back to self.
+ flowFiles.forEach(f -> result.routeTo(session.penalize(f), Relationship.SELF));
+ return null;
}
- session.transfer(flowFiles, relationship);
+ } catch (IllegalArgumentException e) {
+ // Map relationship based on context, and then let default handler to handle.
+ final ErrorTypes.Result adjustedRoute = adjustError.apply(functionContext, ErrorTypes.InvalidInput);
+ ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY)
+ .apply(functionContext, () -> flowFiles, adjustedRoute, e);
return null;
}
// sort by fragment index.
- Collections.sort(flowFiles, new Comparator<FlowFile>() {
- @Override
- public int compare(final FlowFile o1, final FlowFile o2) {
- return Integer.compare(Integer.parseInt(o1.getAttribute(FRAGMENT_INDEX_ATTR)), Integer.parseInt(o2.getAttribute(FRAGMENT_INDEX_ATTR)));
- }
- });
+ flowFiles.sort(Comparator.comparing(o -> Integer.parseInt(o.getAttribute(FRAGMENT_INDEX_ATTR))));
}
return new FlowFilePoll(flowFiles, fragmentedTransaction);
@@ -521,63 +624,6 @@ public class PutSQL extends AbstractProcessor {
return null;
}
-
- /**
- * Returns the StatementFlowFileEnclosure that should be used for executing the given SQL statement
- *
- * @param sql the SQL to execute
- * @param conn the connection from which a PreparedStatement can be created
- * @param stmtMap the existing map of SQL to PreparedStatements
- * @param obtainKeys whether or not we need to obtain generated keys for INSERT statements
- * @param fragmentedTransaction whether or not the SQL pertains to a fragmented transaction
- *
- * @return a StatementFlowFileEnclosure to use for executing the given SQL statement
- *
- * @throws SQLException if unable to create the appropriate PreparedStatement
- */
- private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap,
- final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException {
- StatementFlowFileEnclosure enclosure = stmtMap.get(sql);
- if (enclosure != null) {
- return enclosure;
- }
-
- if (obtainKeys) {
- // Create a new Prepared Statement, requesting that it return the generated keys.
- PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
-
- if (stmt == null) {
- // since we are passing Statement.RETURN_GENERATED_KEYS, calls to conn.prepareStatement will
- // in some cases (at least for DerbyDB) return null.
- // We will attempt to recompile the statement without the generated keys being returned.
- stmt = conn.prepareStatement(sql);
- }
-
- // If we need to obtain keys, then we cannot do a Batch Update. In this case,
- // we don't need to store the PreparedStatement in the Map because we aren't
- // doing an addBatch/executeBatch. Instead, we will use the statement once
- // and close it.
- return new StatementFlowFileEnclosure(stmt);
- } else if (fragmentedTransaction) {
- // We cannot use Batch Updates if we have a transaction that spans multiple FlowFiles.
- // If we did, we could end up processing the statements out of order. It's quite possible
- // that we could refactor the code some to allow for this, but as it is right now, this
- // could cause problems. This is because we have a Map<String, StatementFlowFileEnclosure>.
- // If we had a transaction that needed to execute Stmt A with some parameters, then Stmt B with
- // some parameters, then Stmt A with different parameters, this would become problematic because
- // the executeUpdate would be evaluated first for Stmt A (the 1st and 3rd statements, and then
- // the second statement would be evaluated).
- final PreparedStatement stmt = conn.prepareStatement(sql);
- return new StatementFlowFileEnclosure(stmt);
- }
-
- final PreparedStatement stmt = conn.prepareStatement(sql);
- enclosure = new StatementFlowFileEnclosure(stmt);
- stmtMap.put(sql, enclosure);
- return enclosure;
- }
-
-
/**
* Determines the SQL statement that should be executed for the given FlowFile
*
@@ -618,7 +664,7 @@ public class PutSQL extends AbstractProcessor {
final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
if (!isNumeric) {
- throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type");
+ throw new SQLDataException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type");
}
final int jdbcType = Integer.parseInt(entry.getValue());
@@ -630,11 +676,11 @@ public class PutSQL extends AbstractProcessor {
try {
setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType, parameterFormat);
} catch (final NumberFormatException nfe) {
- throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe);
+ throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe);
} catch (ParseException pe) {
- throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe);
+ throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe);
} catch (UnsupportedEncodingException uee) {
- throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to UTF-8", uee);
+ throw new SQLDataException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to UTF-8", uee);
}
}
}
@@ -652,76 +698,69 @@ public class PutSQL extends AbstractProcessor {
* @return the appropriate relationship to route the FlowFiles to, or <code>null</code> if the FlowFiles
* should instead be processed
*/
- Relationship determineRelationship(final List<FlowFile> flowFiles, final Long transactionTimeoutMillis) {
+ boolean isFragmentedTransactionReady(final List<FlowFile> flowFiles, final Long transactionTimeoutMillis) throws IllegalArgumentException {
int selectedNumFragments = 0;
final BitSet bitSet = new BitSet();
+ BiFunction<String, Object[], IllegalArgumentException> illegal = (s, objects) -> new IllegalArgumentException(String.format(s, objects));
+
for (final FlowFile flowFile : flowFiles) {
final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR);
if (fragmentCount == null && flowFiles.size() == 1) {
- return null;
+ return true;
} else if (fragmentCount == null) {
- getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier "
- + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()});
- return REL_FAILURE;
+ throw illegal.apply("Cannot process %s because there are %d FlowFiles with the same fragment.identifier "
+ + "attribute but not all FlowFiles have a fragment.count attribute", new Object[] {flowFile, flowFiles.size()});
}
final int numFragments;
try {
numFragments = Integer.parseInt(fragmentCount);
} catch (final NumberFormatException nfe) {
- getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
- return REL_FAILURE;
+ throw illegal.apply("Cannot process %s because the fragment.count attribute has a value of '%s', which is not an integer",
+ new Object[] {flowFile, fragmentCount});
}
if (numFragments < 1) {
- getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
- return REL_FAILURE;
+ throw illegal.apply("Cannot process %s because the fragment.count attribute has a value of '%s', which is not a positive integer",
+ new Object[] {flowFile, fragmentCount});
}
if (selectedNumFragments == 0) {
selectedNumFragments = numFragments;
} else if (numFragments != selectedNumFragments) {
- getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
- return REL_FAILURE;
+ throw illegal.apply("Cannot process %s because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier",
+ new Object[] {flowFile});
}
final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR);
if (fragmentIndex == null) {
- getLogger().error("Cannot process {} because the fragment.index attribute is missing; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
- return REL_FAILURE;
+ throw illegal.apply("Cannot process %s because the fragment.index attribute is missing", new Object[] {flowFile});
}
final int idx;
try {
idx = Integer.parseInt(fragmentIndex);
} catch (final NumberFormatException nfe) {
- getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
- return REL_FAILURE;
+ throw illegal.apply("Cannot process %s because the fragment.index attribute has a value of '%s', which is not an integer",
+ new Object[] {flowFile, fragmentIndex});
}
if (idx < 0) {
- getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
- return REL_FAILURE;
+ throw illegal.apply("Cannot process %s because the fragment.index attribute has a value of '%s', which is not a positive integer",
+ new Object[] {flowFile, fragmentIndex});
}
if (bitSet.get(idx)) {
- getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
- return REL_FAILURE;
+ throw illegal.apply("Cannot process %s because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier",
+ new Object[] {flowFile});
}
bitSet.set(idx);
}
if (selectedNumFragments == flowFiles.size()) {
- return null; // no relationship to route FlowFiles to yet - process the FlowFiles.
+ return true; // no relationship to route FlowFiles to yet - process the FlowFiles.
}
long latestQueueTime = 0L;
@@ -733,13 +772,12 @@ public class PutSQL extends AbstractProcessor {
if (transactionTimeoutMillis != null) {
if (latestQueueTime > 0L && System.currentTimeMillis() - latestQueueTime > transactionTimeoutMillis) {
- getLogger().error("The transaction timeout has expired for the following FlowFiles; they will be routed to failure: {}", new Object[] {flowFiles});
- return REL_FAILURE;
+ throw illegal.apply("The transaction timeout has expired for the following FlowFiles; they will be routed to failure: %s", new Object[] {flowFiles});
}
}
getLogger().debug("Not enough FlowFiles for transaction. Returning all FlowFiles to queue");
- return Relationship.SELF; // not enough FlowFiles for this transaction. Return them all to queue.
+ return false; // not enough FlowFiles for this transaction. Return them all to queue.
}
/**
@@ -924,7 +962,7 @@ public class PutSQL extends AbstractProcessor {
if (selectedId.equals(fragmentId)) {
// fragment id's match. Find out if we have all of the necessary fragments or not.
final int numFragments;
- if (NUMBER_PATTERN.matcher(fragCount).matches()) {
+ if (fragCount != null && NUMBER_PATTERN.matcher(fragCount).matches()) {
numFragments = Integer.parseInt(fragCount);
} else {
numFragments = Integer.MAX_VALUE;
@@ -971,22 +1009,69 @@ public class PutSQL extends AbstractProcessor {
}
+ private static class FragmentedEnclosure extends StatementFlowFileEnclosure {
+
+ private final Map<FlowFile, StatementFlowFileEnclosure> flowFileToEnclosure = new HashMap<>();
+
+ public FragmentedEnclosure() {
+ super(null);
+ }
+
+ public void addFlowFile(final FlowFile flowFile, final StatementFlowFileEnclosure enclosure) {
+ addFlowFile(flowFile);
+ flowFileToEnclosure.put(flowFile, enclosure);
+ }
+
+ public StatementFlowFileEnclosure getTargetEnclosure(final FlowFile flowFile) {
+ return flowFileToEnclosure.get(flowFile);
+ }
+ }
+
/**
* A simple, immutable data structure to hold a Prepared Statement and a List of FlowFiles
* for which that statement should be evaluated.
*/
- private static class StatementFlowFileEnclosure {
- private final PreparedStatement statement;
+ private static class StatementFlowFileEnclosure implements FlowFileGroup {
+ private final String sql;
+ private PreparedStatement statement;
private final List<FlowFile> flowFiles = new ArrayList<>();
- public StatementFlowFileEnclosure(final PreparedStatement statement) {
- this.statement = statement;
+ public StatementFlowFileEnclosure(String sql) {
+ this.sql = sql;
}
- public PreparedStatement getStatement() {
+ public PreparedStatement getNewStatement(final Connection conn, final boolean obtainKeys) throws SQLException {
+ if (obtainKeys) {
+ // Create a new Prepared Statement, requesting that it return the generated keys.
+ PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
+
+ if (stmt == null) {
+ // since we are passing Statement.RETURN_GENERATED_KEYS, calls to conn.prepareStatement will
+ // in some cases (at least for DerbyDB) return null.
+ // We will attempt to recompile the statement without the generated keys being returned.
+ stmt = conn.prepareStatement(sql);
+ }
+
+ // If we need to obtain keys, then we cannot do a Batch Update. In this case,
+ // we don't need to store the PreparedStatement in the Map because we aren't
+ // doing an addBatch/executeBatch. Instead, we will use the statement once
+ // and close it.
+ return stmt;
+ }
+
+ return conn.prepareStatement(sql);
+ }
+
+ public PreparedStatement getCachedStatement(final Connection conn) throws SQLException {
+ if (statement != null) {
+ return statement;
+ }
+
+ statement = conn.prepareStatement(sql);
return statement;
}
+ @Override
public List<FlowFile> getFlowFiles() {
return flowFiles;
}
@@ -997,7 +1082,7 @@ public class PutSQL extends AbstractProcessor {
@Override
public int hashCode() {
- return statement.hashCode();
+ return sql.hashCode();
}
@Override
@@ -1013,7 +1098,7 @@ public class PutSQL extends AbstractProcessor {
}
final StatementFlowFileEnclosure other = (StatementFlowFileEnclosure) obj;
- return statement.equals(other.getStatement());
+ return sql.equals(other.sql);
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index bb12fb4..e99f43a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard
import org.apache.nifi.processor.exception.ProcessException
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure
import org.apache.nifi.processors.standard.util.record.MockRecordParser
import org.apache.nifi.reporting.InitializationException
import org.apache.nifi.serialization.record.RecordField
@@ -42,7 +43,6 @@ import java.sql.Statement
import static org.junit.Assert.assertEquals
import static org.junit.Assert.assertFalse
-import static org.junit.Assert.assertNull
import static org.junit.Assert.assertTrue
import static org.junit.Assert.fail
import static org.mockito.Mockito.spy
@@ -53,7 +53,8 @@ import static org.mockito.Mockito.spy
@RunWith(JUnit4.class)
class TestPutDatabaseRecord {
- private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"
+ private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," +
+ " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))"
private final static String DB_LOCATION = "target/db_pdr"
TestRunner runner
@@ -131,33 +132,20 @@ class TestPutDatabaseRecord {
] as PutDatabaseRecord.TableSchema
+ runner.setProperty(PutDatabaseRecord.TRANSLATE_FIELD_NAMES, 'false')
+ runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, 'false')
+ runner.setProperty(PutDatabaseRecord.UNMATCHED_COLUMN_BEHAVIOR, 'false')
+ runner.setProperty(PutDatabaseRecord.QUOTED_IDENTIFIERS, 'false')
+ runner.setProperty(PutDatabaseRecord.QUOTED_TABLE_IDENTIFIER, 'false')
+ def settings = new PutDatabaseRecord.DMLSettings(runner.getProcessContext())
+
processor.with {
- try {
- assertNull(generateInsert(null, null, null,
- false, false, false, false,
- false, false).sql)
- fail('Expecting ProcessException')
- } catch (ProcessException ignore) {
- // Expected
- }
- try {
- assertNull(generateInsert(null, 'PERSONS', null,
- false, false, false, false,
- false, false).sql)
- fail('Expecting ProcessException')
- } catch (ProcessException ignore) {
- // Expected
- }
assertEquals('INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)',
- generateInsert(schema, 'PERSONS', tableSchema,
- false, false, false, false,
- false, false).sql)
+ generateInsert(schema, 'PERSONS', tableSchema, settings).sql)
assertEquals('DELETE FROM PERSONS WHERE id = ? AND name = ? AND code = ?',
- generateDelete(schema, 'PERSONS', tableSchema,
- false, false, false, false,
- false, false).sql)
+ generateDelete(schema, 'PERSONS', tableSchema, settings).sql)
}
}
@@ -211,6 +199,81 @@ class TestPutDatabaseRecord {
}
@Test
+ void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException, IOException {
+ recreateTable("PERSONS", createPersons)
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("id", RecordFieldType.INT)
+ parser.addSchemaField("name", RecordFieldType.STRING)
+ parser.addSchemaField("code", RecordFieldType.INT)
+
+ parser.addRecord(1, 'rec1', 101)
+ parser.addRecord(2, 'rec2', 102)
+ parser.addRecord(3, 'rec3', 1000)
+ parser.addRecord(4, 'rec4', 104)
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+ runner.enqueue(new byte[0])
+ runner.run()
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
+ runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(PutDatabaseRecord.REL_RETRY, 1)
+ final Connection conn = dbcp.getConnection()
+ final Statement stmt = conn.createStatement()
+ final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+ // Transaction should be rolled back and table should remain empty.
+ assertFalse(rs.next())
+
+ stmt.close()
+ conn.close()
+ }
+
+ @Test
+ void testInsertBatchUpdateExceptionRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ recreateTable("PERSONS", createPersons)
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("id", RecordFieldType.INT)
+ parser.addSchemaField("name", RecordFieldType.STRING)
+ parser.addSchemaField("code", RecordFieldType.INT)
+
+ parser.addRecord(1, 'rec1', 101)
+ parser.addRecord(2, 'rec2', 102)
+ parser.addRecord(3, 'rec3', 1000)
+ parser.addRecord(4, 'rec4', 104)
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, 'true')
+
+ runner.enqueue(new byte[0])
+ try {
+ runner.run()
+ fail("ProcessException is expected")
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException)
+ }
+
+ final Connection conn = dbcp.getConnection()
+ final Statement stmt = conn.createStatement()
+ final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+ // Transaction should be rolled back and table should remain empty.
+ assertFalse(rs.next())
+
+ stmt.close()
+ conn.close()
+ }
+
+ @Test
void testInsertNoTable() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons)
final MockRecordParser parser = new MockRecordParser()
@@ -300,6 +363,37 @@ class TestPutDatabaseRecord {
}
@Test
+ void testSqlStatementTypeNoValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ recreateTable("PERSONS", createPersons)
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("sql", RecordFieldType.STRING)
+
+ parser.addRecord('')
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE)
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+ runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql')
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, 'true')
+
+ def attrs = [:]
+ attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql'
+ runner.enqueue(new byte[0], attrs)
+ try {
+ runner.run()
+ fail("ProcessException is expected")
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException)
+ }
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
+ runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 0)
+ }
+
+ @Test
void testUpdate() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons)
final MockRecordParser parser = new MockRecordParser()
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
index 5e9ab58..2d0491b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
@@ -39,6 +40,7 @@ import org.apache.commons.lang3.RandomUtils;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -165,6 +167,29 @@ public class TestPutSQL {
runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
}
+ @Test
+ public void testFailInMiddleWithBadStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+ runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
+ runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+ runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes());
+ runner.enqueue("INSERT INTO PERSONS_AI".getBytes()); // intentionally wrong syntax
+ runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 3)".getBytes());
+ runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 44)".getBytes());
+
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
+ runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
+ }
+ }
+
@Test
public void testFailInMiddleWithBadParameterType() throws InitializationException, ProcessException, SQLException, IOException {
@@ -191,8 +216,41 @@ public class TestPutSQL {
runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
+
}
+ @Test
+ public void testFailInMiddleWithBadParameterTypeRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+ runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
+ runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+ final Map<String, String> goodAttributes = new HashMap<>();
+ goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+ goodAttributes.put("sql.args.1.value", "84");
+
+ final Map<String, String> badAttributes = new HashMap<>();
+ badAttributes.put("sql.args.1.type", String.valueOf(Types.VARCHAR));
+ badAttributes.put("sql.args.1.value", "hello");
+
+ final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
+ runner.enqueue(data, goodAttributes);
+ runner.enqueue(data, badAttributes);
+ runner.enqueue(data, goodAttributes);
+ runner.enqueue(data, goodAttributes);
+
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
+ runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
+ }
+ }
@Test
public void testFailInMiddleWithBadParameterValue() throws InitializationException, ProcessException, SQLException, IOException {
@@ -235,6 +293,48 @@ public class TestPutSQL {
}
}
+ @Test
+ public void testFailInMiddleWithBadParameterValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+ runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
+ runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+ recreateTable("PERSONS_AI",createPersonsAutoId);
+
+ final Map<String, String> goodAttributes = new HashMap<>();
+ goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+ goodAttributes.put("sql.args.1.value", "84");
+
+ final Map<String, String> badAttributes = new HashMap<>();
+ badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+ badAttributes.put("sql.args.1.value", "9999");
+
+ final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
+ runner.enqueue(data, goodAttributes);
+ runner.enqueue(data, badAttributes);
+ runner.enqueue(data, goodAttributes);
+ runner.enqueue(data, goodAttributes);
+
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
+ runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
+ }
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
+ assertFalse(rs.next());
+ }
+ }
+ }
+
@Test
public void testUsingSqlDataTypesWithNegativeValues() throws InitializationException, ProcessException, SQLException, IOException {
@@ -666,6 +766,8 @@ public class TestPutSQL {
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+ recreateTable("PERSONS", createPersons);
+
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
@@ -695,6 +797,47 @@ public class TestPutSQL {
}
}
+ @Test
+ public void testMultipleStatementsWithinFlowFileRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+ runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+ recreateTable("PERSONS", createPersons);
+
+ final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
+ "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.1.value", "1");
+
+ attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+ attributes.put("sql.args.2.value", "Mark");
+
+ attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.3.value", "84");
+
+ attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.4.value", "1");
+
+ runner.enqueue(sql.getBytes(), attributes);
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+ assertFalse(rs.next());
+ }
+ }
+ }
+
@Test
public void testWithNullParameter() throws InitializationException, ProcessException, SQLException, IOException {
@@ -766,6 +909,47 @@ public class TestPutSQL {
}
}
+ @Test
+ public void testInvalidStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+ runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+ recreateTable("PERSONS", createPersons);
+
+ final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
+ "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.1.value", "1");
+
+ attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+ attributes.put("sql.args.2.value", "Mark");
+
+ attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.3.value", "84");
+
+ attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.4.value", "1");
+
+ runner.enqueue(sql.getBytes(), attributes);
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+ assertFalse(rs.next());
+ }
+ }
+ }
+
@Test
public void testRetryableFailure() throws InitializationException, ProcessException, SQLException, IOException {
@@ -798,6 +982,42 @@ public class TestPutSQL {
runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1);
}
+ @Test
+ public void testRetryableFailureRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+ final DBCPService service = new SQLExceptionService(null);
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+ final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
+ "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.1.value", "1");
+
+ attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+ attributes.put("sql.args.2.value", "Mark");
+
+ attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.3.value", "84");
+
+ attributes.put("sql.args.4.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.4.value", "1");
+
+ runner.enqueue(sql.getBytes(), attributes);
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ // Should not be routed to retry.
+ runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 0);
+ }
+
+ }
@Test
public void testMultipleFlowFilesSuccessfulInTransaction() throws InitializationException, ProcessException, SQLException, IOException {
@@ -857,6 +1077,38 @@ public class TestPutSQL {
}
}
+ @Test
+ public void testMultipleFlowFilesSuccessfulInTransactionRollBackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+ runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(PutSQL.BATCH_SIZE, "1");
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+ recreateTable("PERSONS", createPersons);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.1.value", "1");
+
+ attributes.put("sql.args.2.type", String.valueOf(Types.VARCHAR));
+ attributes.put("sql.args.2.value", "Mark");
+
+ attributes.put("sql.args.3.type", String.valueOf(Types.INTEGER));
+ attributes.put("sql.args.3.value", "84");
+
+ attributes.put("fragment.identifier", "1");
+ attributes.put("fragment.count", "2");
+ attributes.put("fragment.index", "0");
+ runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
+ // ProcessException should not be thrown in this case, because the input FlowFiles are simply differed.
+ runner.run();
+
+ // No FlowFiles should be transferred because there were not enough flowfiles with the same fragment identifier
+ runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0);
+
+ }
@Test
public void testTransactionTimeout() throws InitializationException, ProcessException, SQLException, IOException {
@@ -895,6 +1147,81 @@ public class TestPutSQL {
runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
}
+ @Test
+ public void testTransactionTimeoutRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
+ runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("fragment.identifier", "1");
+ attributes.put("fragment.count", "2");
+ attributes.put("fragment.index", "0");
+
+ final MockFlowFile mff = new MockFlowFile(0L) {
+ @Override
+ public Long getLastQueueDate() {
+ return System.currentTimeMillis() - 10000L; // return 10 seconds ago
+ }
+
+ @Override
+ public Map<String, String> getAttributes() {
+ return attributes;
+ }
+
+ @Override
+ public String getAttribute(final String attrName) {
+ return attributes.get(attrName);
+ }
+ };
+
+ runner.enqueue(mff);
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0);
+ }
+
+ @Test
+ public void testNullFragmentCountRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
+ runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+ final Map<String, String> attribute1 = new HashMap<>();
+ attribute1.put("fragment.identifier", "1");
+ attribute1.put("fragment.count", "2");
+ attribute1.put("fragment.index", "0");
+
+ final Map<String, String> attribute2 = new HashMap<>();
+ attribute2.put("fragment.identifier", "1");
+// attribute2.put("fragment.count", null);
+ attribute2.put("fragment.index", "1");
+
+ runner.enqueue(new byte[]{}, attribute1);
+ runner.enqueue(new byte[]{}, attribute2);
+
+
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0);
+ }
+
/**
* Simple implementation only for testing purposes
*/
@@ -985,4 +1312,5 @@ public class TestPutSQL {
byte[] bBinary = RandomUtils.nextBytes(length);
return DatatypeConverter.printBase64Binary(bBinary);
}
+
}
[3/5] nifi git commit: NIFI-3415: Add Rollback on Failure.
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
index 61b5304..f642607 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
@@ -34,7 +34,7 @@ import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -46,6 +46,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
@@ -57,11 +58,13 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import static org.apache.nifi.processors.hive.PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR;
+import static org.apache.nifi.processors.hive.PutHiveStreaming.REL_SUCCESS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
@@ -213,11 +216,33 @@ public class TestPutHiveStreaming {
}
@Test
- public void onTriggerMultipleRecords() throws Exception {
+ public void onTriggerBadInputRollbackOnFailure() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
- runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "2");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ runner.enqueue("I am not an Avro record".getBytes());
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+
+ @Test
+ public void onTriggerMultipleRecordsSingleTransaction() throws Exception {
+ runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
@@ -237,13 +262,191 @@ public class TestPutHiveStreaming {
put("favorite_number", 3);
}
};
- runner.enqueue(createAvroRecord(Arrays.asList(user1, user2, user3)));
+ final List<Map<String, Object>> users = Arrays.asList(user1, user2, user3);
+ runner.enqueue(createAvroRecord(users));
runner.run();
runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0);
- assertNotNull(resultFlowFile);
- assertEquals("3", resultFlowFile.getAttribute(PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR));
+ assertOutputAvroRecords(users, resultFlowFile);
+ }
+
+ @Test
+ public void onTriggerMultipleRecordsMultipleTransaction() throws Exception {
+ runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ Map<String, Object> user2 = new HashMap<String, Object>() {
+ {
+ put("name", "Mary");
+ put("favorite_number", 42);
+ }
+ };
+ Map<String, Object> user3 = new HashMap<String, Object>() {
+ {
+ put("name", "Matt");
+ put("favorite_number", 3);
+ }
+ };
+ final List<Map<String, Object>> users = Arrays.asList(user1, user2, user3);
+ runner.enqueue(createAvroRecord(users));
+ runner.run();
+
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
+ MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0);
+ assertOutputAvroRecords(users, resultFlowFile);
+ }
+
+ @Test
+ public void onTriggerMultipleRecordsFailInMiddle() throws Exception {
+ runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
+ runner.setValidateExpressionUsage(false);
+ processor.setGenerateWriteFailure(true, 1);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ Map<String, Object> user2 = new HashMap<String, Object>() {
+ {
+ put("name", "Mary");
+ put("favorite_number", 42);
+ }
+ };
+ Map<String, Object> user3 = new HashMap<String, Object>() {
+ {
+ put("name", "Matt");
+ put("favorite_number", 3);
+ }
+ };
+ Map<String, Object> user4 = new HashMap<String, Object>() {
+ {
+ put("name", "Mike");
+ put("favorite_number", 345);
+ }
+ };
+ final List<Map<String, Object>> users = Arrays.asList(user1, user2, user3, user4);
+ runner.enqueue(createAvroRecord(users));
+ runner.run();
+
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+
+ MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0);
+ assertOutputAvroRecords(Arrays.asList(user1, user3, user4), resultFlowFile);
+
+ final MockFlowFile failedFlowFile = runner.getFlowFilesForRelationship(PutHiveStreaming.REL_FAILURE).get(0);
+ assertOutputAvroRecords(Arrays.asList(user2), failedFlowFile);
+ }
+
+ @Test
+ public void onTriggerMultipleRecordsFailInMiddleRollbackOnFailure() throws Exception {
+ runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ processor.setGenerateWriteFailure(true, 1);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ Map<String, Object> user2 = new HashMap<String, Object>() {
+ {
+ put("name", "Mary");
+ put("favorite_number", 42);
+ }
+ };
+ Map<String, Object> user3 = new HashMap<String, Object>() {
+ {
+ put("name", "Matt");
+ put("favorite_number", 3);
+ }
+ };
+ runner.enqueue(createAvroRecord(Arrays.asList(user1, user2, user3)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown, because any Hive Transaction is committed yet.");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
+ public void onTriggerMultipleRecordsFailInMiddleRollbackOnFailureCommitted() throws Exception {
+ runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ // The first two records are committed, then an issue will happen at the 3rd record.
+ processor.setGenerateWriteFailure(true, 2);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ Map<String, Object> user2 = new HashMap<String, Object>() {
+ {
+ put("name", "Mary");
+ put("favorite_number", 42);
+ }
+ };
+ Map<String, Object> user3 = new HashMap<String, Object>() {
+ {
+ put("name", "Matt");
+ put("favorite_number", 3);
+ }
+ };
+ Map<String, Object> user4 = new HashMap<String, Object>() {
+ {
+ put("name", "Mike");
+ put("favorite_number", 345);
+ }
+ };
+ runner.enqueue(createAvroRecord(Arrays.asList(user1, user2, user3, user4)));
+ // ProcessException should NOT be thrown, because a Hive Transaction is already committed.
+ runner.run();
+
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+
+ // Assert transferred FlowFile.
+ assertOutputAvroRecords(Arrays.asList(user1, user2), runner.getFlowFilesForRelationship(REL_SUCCESS).get(0));
+
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+
+ }
+
+ private void assertOutputAvroRecords(List<Map<String, Object>> expectedRecords, MockFlowFile resultFlowFile) throws IOException {
+ assertEquals(String.valueOf(expectedRecords.size()), resultFlowFile.getAttribute(PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR));
+
final DataFileStream<GenericRecord> reader = new DataFileStream<>(
new ByteArrayInputStream(resultFlowFile.toByteArray()),
new GenericDatumReader<GenericRecord>());
@@ -253,17 +456,20 @@ public class TestPutHiveStreaming {
// Verify that the schema is preserved
assertTrue(schema.equals(new Schema.Parser().parse(new File("src/test/resources/user.avsc"))));
- // Verify the records are intact. We can't guarantee order so check the total number and non-null fields
- assertTrue(reader.hasNext());
- GenericRecord record = reader.next(null);
- assertNotNull(record.get("name"));
- assertNotNull(record.get("favorite_number"));
- assertNull(record.get("favorite_color"));
- assertNull(record.get("scale"));
- assertTrue(reader.hasNext());
- record = reader.next(record);
- assertTrue(reader.hasNext());
- reader.next(record);
+ GenericRecord record = null;
+ for (Map<String, Object> expectedRecord : expectedRecords) {
+ assertTrue(reader.hasNext());
+ record = reader.next(record);
+ final String name = record.get("name").toString();
+ final Integer favorite_number = (Integer) record.get("favorite_number");
+ assertNotNull(name);
+ assertNotNull(favorite_number);
+ assertNull(record.get("favorite_color"));
+ assertNull(record.get("scale"));
+
+ assertEquals(expectedRecord.get("name"), name);
+ assertEquals(expectedRecord.get("favorite_number"), favorite_number);
+ }
assertFalse(reader.hasNext());
}
@@ -319,6 +525,39 @@ public class TestPutHiveStreaming {
}
@Test
+ public void onTriggerWithPartitionColumnsNotInRecordRollbackOnFailure() throws Exception {
+ runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_food");
+ runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "false");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ put("favorite_color", "blue");
+ }
+ };
+
+ runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void onTriggerWithRetireWriters() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
@@ -390,6 +629,36 @@ public class TestPutHiveStreaming {
}
@Test
+ public void onTriggerWithConnectFailureRollbackOnFailure() throws Exception {
+ processor.setGenerateConnectFailure(true);
+ runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void onTriggerWithInterruptedException() throws Exception {
processor.setGenerateInterruptedExceptionOnCreateWriter(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
@@ -410,6 +679,32 @@ public class TestPutHiveStreaming {
}
@Test
+ public void onTriggerWithInterruptedExceptionRollbackOnFailure() throws Exception {
+ processor.setGenerateInterruptedExceptionOnCreateWriter(true);
+ runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ }
+
+ @Test
public void onTriggerWithWriteFailure() throws Exception {
processor.setGenerateWriteFailure(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
@@ -437,6 +732,40 @@ public class TestPutHiveStreaming {
}
@Test
+ public void onTriggerWithWriteFailureRollbackOnFailure() throws Exception {
+ processor.setGenerateWriteFailure(true);
+ runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ Map<String, Object> user2 = new HashMap<String, Object>() {
+ {
+ put("name", "Mary");
+ put("favorite_number", 42);
+ }
+ };
+ runner.enqueue(createAvroRecord(Arrays.asList(user1, user2)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void onTriggerWithSerializationError() throws Exception {
processor.setGenerateSerializationError(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
@@ -458,6 +787,35 @@ public class TestPutHiveStreaming {
}
@Test
+ public void onTriggerWithSerializationErrorRollbackOnFailure() throws Exception {
+ processor.setGenerateSerializationError(true);
+ runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void onTriggerWithCommitFailure() throws Exception {
processor.setGenerateCommitFailure(true);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
@@ -474,9 +832,39 @@ public class TestPutHiveStreaming {
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
- runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 1);
+ }
+
+ @Test
+ public void onTriggerWithCommitFailureRollbackOnFailure() throws Exception {
+ processor.setGenerateCommitFailure(true);
+ runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
}
@Test
@@ -496,9 +884,39 @@ public class TestPutHiveStreaming {
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
- runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
+ runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 1);
+ }
+
+ @Test
+ public void onTriggerWithTransactionFailureRollbackOnFailure() throws Exception {
+ processor.setGenerateTransactionFailure(true);
+ runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "default");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
+ runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
+ runner.setValidateExpressionUsage(false);
+ Map<String, Object> user1 = new HashMap<String, Object>() {
+ {
+ put("name", "Joe");
+ put("favorite_number", 146);
+ }
+ };
+ runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 0);
runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
+ // Assert incoming FlowFile stays in input queue.
+ assertEquals(1, runner.getQueueSize().getObjectCount());
}
@Test
@@ -563,10 +981,12 @@ public class TestPutHiveStreaming {
private boolean generateConnectFailure = false;
private boolean generateInterruptedExceptionOnCreateWriter = false;
private boolean generateWriteFailure = false;
+ private Integer generateWriteFailureRecordIndex;
private boolean generateSerializationError = false;
private boolean generateCommitFailure = false;
private boolean generateTransactionFailure = false;
private boolean generateExceptionOnFlushAndClose = false;
+ private HiveEndPoint hiveEndPoint = mock(HiveEndPoint.class);
@Override
public KerberosProperties getKerberosProperties() {
@@ -579,7 +999,6 @@ public class TestPutHiveStreaming {
@Override
public HiveEndPoint makeHiveEndPoint(List<String> partitionValues, HiveOptions hiveOptions) {
- HiveEndPoint hiveEndPoint = mock(HiveEndPoint.class);
return hiveEndPoint;
}
@@ -593,7 +1012,7 @@ public class TestPutHiveStreaming {
throw new InterruptedException();
}
MockHiveWriter hiveWriter = new MockHiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(), options.getCallTimeOut(), callTimeoutPool, ugi, hiveConfig);
- hiveWriter.setGenerateWriteFailure(generateWriteFailure);
+ hiveWriter.setGenerateWriteFailure(generateWriteFailure, generateWriteFailureRecordIndex);
hiveWriter.setGenerateSerializationError(generateSerializationError);
hiveWriter.setGenerateCommitFailure(generateCommitFailure);
hiveWriter.setGenerateTransactionFailure(generateTransactionFailure);
@@ -613,6 +1032,11 @@ public class TestPutHiveStreaming {
this.generateWriteFailure = generateWriteFailure;
}
+ public void setGenerateWriteFailure(boolean generateWriteFailure, int generateWriteFailureRecordIndex) {
+ this.generateWriteFailure = generateWriteFailure;
+ this.generateWriteFailureRecordIndex = generateWriteFailureRecordIndex;
+ }
+
public void setGenerateSerializationError(boolean generateSerializationError) {
this.generateSerializationError = generateSerializationError;
}
@@ -634,10 +1058,13 @@ public class TestPutHiveStreaming {
private class MockHiveWriter extends HiveWriter {
private boolean generateWriteFailure = false;
+ private Integer generateWriteFailureRecordIndex;
private boolean generateSerializationError = false;
private boolean generateCommitFailure = false;
private boolean generateTransactionFailure = false;
private boolean generateExceptionOnFlushAndClose = false;
+ private int writeAttemptCount = 0;
+ private int totalRecords = 0;
private HiveEndPoint endPoint;
@@ -651,16 +1078,23 @@ public class TestPutHiveStreaming {
@Override
public synchronized void write(byte[] record) throws WriteFailure, SerializationError, InterruptedException {
- if (generateWriteFailure) {
- throw new HiveWriter.WriteFailure(endPoint, 1L, new Exception());
- }
- if (generateSerializationError) {
- throw new SerializationError("Test Serialization Error", new Exception());
+ try {
+ if (generateWriteFailure
+ && (generateWriteFailureRecordIndex == null || writeAttemptCount == generateWriteFailureRecordIndex)) {
+ throw new WriteFailure(endPoint, 1L, new Exception());
+ }
+ if (generateSerializationError) {
+ throw new SerializationError("Test Serialization Error", new Exception());
+ }
+ totalRecords++;
+ } finally {
+ writeAttemptCount++;
}
}
- public void setGenerateWriteFailure(boolean generateWriteFailure) {
+ public void setGenerateWriteFailure(boolean generateWriteFailure, Integer generateWriteFailureRecordIndex) {
this.generateWriteFailure = generateWriteFailure;
+ this.generateWriteFailureRecordIndex = generateWriteFailureRecordIndex;
}
public void setGenerateSerializationError(boolean generateSerializationError) {
@@ -754,6 +1188,11 @@ public class TestPutHiveStreaming {
protected void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException, TxnBatchFailure {
// Empty
}
+
+ @Override
+ public int getTotalRecords() {
+ return totalRecords;
+ }
}
}
[4/5] nifi git commit: NIFI-3415: Add Rollback on Failure.
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
index 5eabfe8..b312327 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
@@ -24,14 +24,24 @@ import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.hive.HiveDBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.ErrorTypes;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler.OnError;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.FetchFlowFiles;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.InitConnection;
+import org.apache.nifi.processor.util.pattern.Put;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processor.util.pattern.RoutingResult;
import java.nio.charset.Charset;
import java.sql.Connection;
@@ -108,6 +118,7 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
_propertyDescriptors.add(BATCH_SIZE);
_propertyDescriptors.add(CHARSET);
_propertyDescriptors.add(STATEMENT_DELIMITER);
+ _propertyDescriptors.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
@@ -117,6 +128,31 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
relationships = Collections.unmodifiableSet(_relationships);
}
+ private Put<FunctionContext, Connection> process;
+ private ExceptionHandler<FunctionContext> exceptionHandler;
+
+ @OnScheduled
+ public void constructProcess() {
+ exceptionHandler = new ExceptionHandler<>();
+ exceptionHandler.mapException(e -> {
+ if (e instanceof SQLNonTransientException) {
+ return ErrorTypes.InvalidInput;
+ } else if (e instanceof SQLException) {
+ return ErrorTypes.TemporalFailure;
+ } else {
+ return ErrorTypes.UnknownFailure;
+ }
+ });
+ exceptionHandler.adjustError(RollbackOnFailure.createAdjustError(getLogger()));
+
+ process = new Put<>();
+ process.setLogger(getLogger());
+ process.initConnection(initConnection);
+ process.fetchFlowFiles(fetchFlowFiles);
+ process.putFlowFile(putFlowFile);
+ process.adjustRoute(RollbackOnFailure.createAdjustRoute(REL_FAILURE, REL_RETRY));
+ }
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
@@ -127,75 +163,95 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
return relationships;
}
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
- final List<FlowFile> flowFiles = session.get(batchSize);
+ private class FunctionContext extends RollbackOnFailure {
+ final Charset charset;
+ final String statementDelimiter;
+ final long startNanos = System.nanoTime();
+
+ String connectionUrl;
+
- if (flowFiles.isEmpty()) {
- return;
+ private FunctionContext(boolean rollbackOnFailure, Charset charset, String statementDelimiter) {
+ super(rollbackOnFailure, false);
+ this.charset = charset;
+ this.statementDelimiter = statementDelimiter;
}
+ }
- final long startNanos = System.nanoTime();
- final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
+ private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc) -> {
final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
- final String statementDelimiter = context.getProperty(STATEMENT_DELIMITER).getValue();
+ final Connection connection = dbcpService.getConnection();
+ fc.connectionUrl = dbcpService.getConnectionURL();
+ return connection;
+ };
- try (final Connection conn = dbcpService.getConnection()) {
+ private FetchFlowFiles<FunctionContext> fetchFlowFiles = (context, session, functionContext, result) -> {
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ return session.get(batchSize);
+ };
- for (FlowFile flowFile : flowFiles) {
- try {
- final String script = getHiveQL(session, flowFile, charset);
- String regex = "(?<!\\\\)" + Pattern.quote(statementDelimiter);
+ private Put.PutFlowFile<FunctionContext, Connection> putFlowFile = (context, session, fc, conn, flowFile, result) -> {
+ final String script = getHiveQL(session, flowFile, fc.charset);
+ String regex = "(?<!\\\\)" + Pattern.quote(fc.statementDelimiter);
- String[] hiveQLs = script.split(regex);
+ String[] hiveQLs = script.split(regex);
- int loc = 1;
- for (String hiveQL: hiveQLs) {
- getLogger().debug("HiveQL: {}", new Object[]{hiveQL});
+ exceptionHandler.execute(fc, flowFile, input -> {
+ int loc = 1;
+ for (String hiveQL: hiveQLs) {
+ getLogger().debug("HiveQL: {}", new Object[]{hiveQL});
- if (!StringUtils.isEmpty(hiveQL.trim())) {
- final PreparedStatement stmt = conn.prepareStatement(hiveQL.trim());
+ if (!StringUtils.isEmpty(hiveQL.trim())) {
+ final PreparedStatement stmt = conn.prepareStatement(hiveQL.trim());
- // Get ParameterMetadata
- // Hive JDBC Doesn't support this yet:
- // ParameterMetaData pmd = stmt.getParameterMetaData();
- // int paramCount = pmd.getParameterCount();
+ // Get ParameterMetadata
+ // Hive JDBC Doesn't support this yet:
+ // ParameterMetaData pmd = stmt.getParameterMetaData();
+ // int paramCount = pmd.getParameterCount();
+ int paramCount = StringUtils.countMatches(hiveQL, "?");
- int paramCount = StringUtils.countMatches(hiveQL, "?");
+ if (paramCount > 0) {
+ loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes());
+ }
- if (paramCount > 0) {
- loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes());
- }
+ // Execute the statement
+ stmt.execute();
+ fc.proceed();
+ }
+ }
- // Execute the statement
- stmt.execute();
- }
- }
- // Emit a Provenance SEND event
- final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ // Emit a Provenance SEND event
+ final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos);
- session.getProvenanceReporter().send(flowFile, dbcpService.getConnectionURL(), transmissionMillis, true);
- session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().send(flowFile, fc.connectionUrl, transmissionMillis, true);
+ result.routeTo(flowFile, REL_SUCCESS);
- } catch (final SQLException e) {
+ }, onFlowFileError(context, session, result));
- if (e instanceof SQLNonTransientException) {
- getLogger().error("Failed to update Hive for {} due to {}; routing to failure", new Object[]{flowFile, e});
- session.transfer(flowFile, REL_FAILURE);
- } else {
- getLogger().error("Failed to update Hive for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[]{flowFile, e});
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_RETRY);
- }
+ };
- }
+ private OnError<FunctionContext, FlowFile> onFlowFileError(final ProcessContext context, final ProcessSession session, final RoutingResult result) {
+ OnError<FunctionContext, FlowFile> onFlowFileError = ExceptionHandler.createOnError(context, session, result, REL_FAILURE, REL_RETRY);
+ onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> {
+ switch (r.destination()) {
+ case Failure:
+ getLogger().error("Failed to update Hive for {} due to {}; routing to failure", new Object[] {i, e}, e);
+ break;
+ case Retry:
+ getLogger().error("Failed to update Hive for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
+ new Object[] {i, e}, e);
+ break;
}
- } catch (final SQLException sqle) {
- // There was a problem getting the connection, yield and retry the flowfiles
- getLogger().error("Failed to get Hive connection due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[]{sqle});
- session.transfer(flowFiles, REL_RETRY);
- context.yield();
- }
+ });
+ return RollbackOnFailure.createOnError(onFlowFileError);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+ final Boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
+ final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
+ final String statementDelimiter = context.getProperty(STATEMENT_DELIMITER).getValue();
+ final FunctionContext functionContext = new FunctionContext(rollbackOnFailure, charset, statementDelimiter);
+ RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, getLogger(), session -> process.onTrigger(context, session, functionContext));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
index 1494595..2754f9c 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.hive;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileStream;
@@ -46,34 +45,38 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.DiscontinuedException;
+import org.apache.nifi.processor.util.pattern.ErrorTypes;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processor.util.pattern.RoutingResult;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.HiveUtils;
import org.apache.nifi.util.hive.HiveWriter;
-import org.json.JSONException;
-import org.json.JSONObject;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -81,6 +84,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.regex.Pattern;
/**
@@ -96,7 +101,7 @@ import java.util.regex.Pattern;
+ "and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively.")
})
@RequiresInstanceClassLoading
-public class PutHiveStreaming extends AbstractProcessor {
+public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
// Attributes
public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
@@ -231,21 +236,27 @@ public class PutHiveStreaming extends AbstractProcessor {
.defaultValue("10000")
.build();
+ public static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty(
+ "NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," +
+ " (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" +
+ " then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue." +
+ " Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
+
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
- .description("A FlowFile containing the JSON contents of a record is routed to this relationship after the record has been successfully transmitted to Hive.")
+ .description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
- .description("A FlowFile containing the JSON contents of a record is routed to this relationship if the record could not be transmitted to Hive.")
+ .description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.")
.build();
public static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that "
- + "some records may have been processed successfully, they will be routed (as JSON flow files) to the success relationship. "
+ + "some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. "
+ "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This "
+ "can be used to provide a retry capability since full rollback is not possible.")
.build();
@@ -283,6 +294,7 @@ public class PutHiveStreaming extends AbstractProcessor {
props.add(HEARTBEAT_INTERVAL);
props.add(TXNS_PER_BATCH);
props.add(RECORDS_PER_TXN);
+ props.add(ROLLBACK_ON_FAILURE);
kerberosConfigFile = context.getKerberosConfigurationFile();
kerberosProperties = new KerberosProperties(kerberosConfigFile);
@@ -364,8 +376,213 @@ public class PutHiveStreaming extends AbstractProcessor {
setupHeartBeatTimer();
}
+ private static class FunctionContext extends RollbackOnFailure {
+
+ private FlowFile inputFlowFile;
+ private AtomicReference<FlowFile> successFlowFile;
+ private AtomicReference<FlowFile> failureFlowFile;
+ private final DataFileWriter<GenericRecord> successAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
+ private final DataFileWriter<GenericRecord> failureAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
+
+ private final AtomicInteger recordCount = new AtomicInteger(0);
+ private final AtomicInteger successfulRecordCount = new AtomicInteger(0);
+ private final AtomicInteger failedRecordCount = new AtomicInteger(0);
+
+ private volatile ExecutorService appendRecordThreadPool;
+ private volatile AtomicBoolean closed = new AtomicBoolean(false);
+ private final BlockingQueue<List<HiveStreamingRecord>> successRecordQueue = new ArrayBlockingQueue<>(100);
+ private final BlockingQueue<List<HiveStreamingRecord>> failureRecordQueue = new ArrayBlockingQueue<>(100);
+
+ private final ComponentLog logger;
+
+ /**
+ * It's possible that multiple Hive streaming transactions are committed within a single onTrigger.
+ * PutHiveStreaming onTrigger is not 'transactional' in a sense of RollbackOnFailure.
+ * Once a Hive streaming transaction is committed, processor session will not be rolled back.
+ * @param rollbackOnFailure whether process session should be rolled back if failed
+ */
+ private FunctionContext(boolean rollbackOnFailure, ComponentLog logger) {
+ super(rollbackOnFailure, false);
+ this.logger = logger;
+ }
+
+ private void setFlowFiles(FlowFile inputFlowFile, FlowFile successFlowFile, FlowFile failureFlowFile) {
+ this.inputFlowFile = inputFlowFile;
+ this.successFlowFile = new AtomicReference<>(successFlowFile);
+ this.failureFlowFile = new AtomicReference<>(failureFlowFile);
+ }
+
+ private void initAvroWriter(ProcessSession session, String codec, DataFileStream<GenericRecord> reader,
+ DataFileWriter<GenericRecord> writer, AtomicReference<FlowFile> flowFileRef,
+ BlockingQueue<List<HiveStreamingRecord>> queue, Function<Integer, Boolean> isCompleted) {
+
+ writer.setCodec(CodecFactory.fromString(codec));
+ // Transfer metadata (this is a subset of the incoming file)
+ for (String metaKey : reader.getMetaKeys()) {
+ if (!RESERVED_METADATA.contains(metaKey)) {
+ writer.setMeta(metaKey, reader.getMeta(metaKey));
+ }
+ }
+
+ appendRecordThreadPool.submit(() -> {
+ flowFileRef.set(session.append(flowFileRef.get(), (out) -> {
+ // Create writer so that records can be appended.
+ writer.create(reader.getSchema(), out);
+
+ try {
+ int writtenCount = 0;
+ while (true) {
+
+ if (closed.get() && isCompleted.apply(writtenCount)) {
+ break;
+ }
+
+ final List<HiveStreamingRecord> hRecords = queue.poll(100, TimeUnit.MILLISECONDS);
+ if (hRecords != null) {
+ try {
+ for (HiveStreamingRecord hRecord : hRecords) {
+ writer.append(hRecord.getRecord());
+ writtenCount++;
+ }
+ } catch (IOException ioe) {
+ // The records were put to Hive Streaming successfully, but there was an error while writing the
+ // Avro records to the flow file. Log as an error and move on.
+ logger.error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file, " + ioe, ioe);
+ }
+ }
+ }
+ writer.flush();
+ } catch (InterruptedException e) {
+ logger.warn("Append record thread is interrupted, " + e, e);
+ }
+
+ }));
+ });
+ }
+
+ private void initAvroWriters(ProcessSession session, String codec, DataFileStream<GenericRecord> reader) {
+ appendRecordThreadPool = Executors.newFixedThreadPool(2);
+ initAvroWriter(session, codec, reader, successAvroWriter, successFlowFile, successRecordQueue, w -> w == successfulRecordCount.get());
+ initAvroWriter(session, codec, reader, failureAvroWriter, failureFlowFile, failureRecordQueue, w -> w == failedRecordCount.get());
+
+ // No new task.
+ appendRecordThreadPool.shutdown();
+ }
+
+ private void appendRecordsToSuccess(List<HiveStreamingRecord> records) {
+ appendRecordsToFlowFile(records, successRecordQueue);
+ successfulRecordCount.addAndGet(records.size());
+ }
+
+ private void appendRecordsToFailure(List<HiveStreamingRecord> records) {
+ appendRecordsToFlowFile(records, failureRecordQueue);
+ failedRecordCount.addAndGet(records.size());
+ }
+
+ private void appendRecordsToFlowFile(List<HiveStreamingRecord> records, BlockingQueue<List<HiveStreamingRecord>> queue) {
+ if (!queue.add(records)) {
+ throw new ProcessException(String.format("Failed to append %d records due to insufficient internal queue capacity.", records.size()));
+ }
+ }
+
+ private void transferFlowFiles(ProcessSession session, RoutingResult result, String transitUri) {
+
+ closeAvroWriters();
+
+ if (successfulRecordCount.get() > 0) {
+ // Transfer the flow file with successful records
+ successFlowFile.set(
+ session.putAttribute(successFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(successfulRecordCount.get())));
+ session.getProvenanceReporter().send(successFlowFile.get(), transitUri);
+ result.routeTo(successFlowFile.get(), REL_SUCCESS);
+ } else {
+ session.remove(successFlowFile.get());
+ }
+
+ if (failedRecordCount.get() > 0) {
+ // There were some failed records, so transfer that flow file to failure
+ failureFlowFile.set(
+ session.putAttribute(failureFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(failedRecordCount.get())));
+ result.routeTo(failureFlowFile.get(), REL_FAILURE);
+ } else {
+ session.remove(failureFlowFile.get());
+ }
+
+ result.getRoutedFlowFiles().forEach((relationship, flowFiles) -> {
+ session.transfer(flowFiles, relationship);
+ });
+ }
+
+ private void closeAvroWriters() {
+ closed.set(true);
+ if (appendRecordThreadPool != null) {
+ // Having null thread pool means the input FlowFile was not processed at all, due to illegal format.
+ try {
+ if (!appendRecordThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
+ logger.warn("Waiting for Avro records being appended into output FlowFiles has been timeout.");
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Waiting for Avro records being appended into output FlowFiles has been interrupted.");
+ }
+ }
+ }
+ }
+
+ private static class ShouldRetryException extends RuntimeException {
+ private ShouldRetryException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ private ExceptionHandler.OnError<FunctionContext, List<HiveStreamingRecord>> onHiveRecordsError(ProcessContext context, ProcessSession session) {
+ return RollbackOnFailure.createOnError((fc, input, res, e) -> {
+
+ if (res.penalty() == ErrorTypes.Penalty.Yield) {
+ context.yield();
+ }
+
+ switch (res.destination()) {
+ case Failure:
+ // Add the failed record to the failure flow file
+ getLogger().error(String.format("Error writing %s to Hive Streaming transaction due to %s", input, e), e);
+ fc.appendRecordsToFailure(input);
+ break;
+
+ case Retry:
+ // If we can't connect to the endpoint, exit the loop and let the outer exception handler route the original flow file to retry
+ abortAndCloseWriters();
+ throw new ShouldRetryException("Hive Streaming connect/write error, flow file will be penalized and routed to retry. " + e, e);
+
+ case Self:
+ abortAndCloseWriters();
+ break;
+
+ default:
+ abortAndCloseWriters();
+ if (e instanceof ProcessException) {
+ throw (ProcessException) e;
+ } else {
+ throw new ProcessException(String.format("Error writing %s to Hive Streaming transaction due to %s", input, e), e);
+ }
+ }
+ });
+ }
+
+ private ExceptionHandler.OnError<FunctionContext, HiveStreamingRecord> onHiveRecordError(ProcessContext context, ProcessSession session) {
+ return (fc, input, res, e) -> onHiveRecordsError(context, session).apply(fc, Collections.singletonList(input), res, e);
+ }
+
+ private ExceptionHandler.OnError<FunctionContext, GenericRecord> onRecordError(ProcessContext context, ProcessSession session) {
+ return (fc, input, res, e) -> onHiveRecordError(context, session).apply(fc, new HiveStreamingRecord(null, input), res, e);
+ }
+
@Override
- public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+ final FunctionContext functionContext = new FunctionContext(context.getProperty(ROLLBACK_ON_FAILURE).asBoolean(), getLogger());
+ RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, getLogger(), session -> onTrigger(context, session, functionContext));
+ }
+
+ private void onTrigger(ProcessContext context, ProcessSession session, FunctionContext functionContext) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
@@ -390,22 +607,58 @@ public class PutHiveStreaming extends AbstractProcessor {
}
}
- final AtomicInteger recordCount = new AtomicInteger(0);
- final AtomicInteger successfulRecordCount = new AtomicInteger(0);
- List<HiveStreamingRecord> successfulRecords = new LinkedList<>();
+ final AtomicReference<List<HiveStreamingRecord>> successfulRecords = new AtomicReference<>();
+ successfulRecords.set(new ArrayList<>());
final FlowFile inputFlowFile = flowFile;
- final AtomicBoolean processingFailure = new AtomicBoolean(false);
+
+ final RoutingResult result = new RoutingResult();
+ final ExceptionHandler<FunctionContext> exceptionHandler = new ExceptionHandler<>();
+ exceptionHandler.mapException(s -> {
+ try {
+ if (s == null) {
+ return ErrorTypes.PersistentFailure;
+ }
+ throw s;
+
+ } catch (IllegalArgumentException
+ | HiveWriter.WriteFailure
+ | SerializationError inputError) {
+
+ return ErrorTypes.InvalidInput;
+
+ } catch (HiveWriter.CommitFailure
+ | HiveWriter.TxnBatchFailure
+ | HiveWriter.TxnFailure writerTxError) {
+
+ return ErrorTypes.TemporalInputFailure;
+
+ } catch (ConnectionError
+ | HiveWriter.ConnectFailure connectionError) {
+ // Can't connect to Hive endpoint.
+ log.error("Error connecting to Hive endpoint: table {} at {}",
+ new Object[]{options.getTableName(), options.getMetaStoreURI()});
+
+ return ErrorTypes.TemporalFailure;
+
+ } catch (IOException
+ | InterruptedException tempError) {
+ return ErrorTypes.TemporalFailure;
+
+ } catch (Exception t) {
+ return ErrorTypes.UnknownFailure;
+ }
+ });
+ final BiFunction<FunctionContext, ErrorTypes, ErrorTypes.Result> adjustError = RollbackOnFailure.createAdjustError(getLogger());
+ exceptionHandler.adjustError(adjustError);
// Create output flow files and their Avro writers
- AtomicReference<FlowFile> successFlowFile = new AtomicReference<>(session.create(inputFlowFile));
- final DataFileWriter<GenericRecord> successAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
- AtomicReference<FlowFile> failureFlowFile = new AtomicReference<>(session.create(inputFlowFile));
- final DataFileWriter<GenericRecord> failureAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
+ functionContext.setFlowFiles(inputFlowFile, session.create(inputFlowFile), session.create(inputFlowFile));
try {
session.read(inputFlowFile, in -> {
try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+
GenericRecord currRecord = null;
// Copy codec and schema information to all writers
@@ -413,239 +666,133 @@ public class PutHiveStreaming extends AbstractProcessor {
? DataFileConstants.NULL_CODEC
: reader.getMetaString(DataFileConstants.CODEC);
- Arrays.asList(successAvroWriter, failureAvroWriter)
- .forEach((writer) -> {
- writer.setCodec(CodecFactory.fromString(codec));
- // Transfer metadata (this is a subset of the incoming file)
- for (String metaKey : reader.getMetaKeys()) {
- if (!RESERVED_METADATA.contains(metaKey)) {
- writer.setMeta(metaKey, reader.getMeta(metaKey));
- }
- }
- });
+ functionContext.initAvroWriters(session, codec, reader);
+
+ Runnable flushSuccessfulRecords = () -> {
+ // Now send the records to the successful FlowFile and update the success count
+ functionContext.appendRecordsToSuccess(successfulRecords.get());
+ // Clear the list of successful records, we'll use it at the end when we flush whatever records are left
+ successfulRecords.set(new ArrayList<>());
+ };
while (reader.hasNext()) {
- currRecord = reader.next(currRecord);
- recordCount.incrementAndGet();
+ // We can NOT reuse currRecord here, because currRecord is accumulated in successful records.
+ // If we use the same GenericRecord instance, every record ends up having the same contents.
+ // To avoid this, we need to create a brand new GenericRecord instance here each time.
+ currRecord = reader.next();
+ functionContext.recordCount.incrementAndGet();
// Extract the partition values (they must be put separately into the Hive Streaming API)
List<String> partitionValues = new ArrayList<>();
- try {
+ if (!exceptionHandler.execute(functionContext, currRecord, input -> {
for (String partition : partitionColumnList) {
- Object partitionValue = currRecord.get(partition);
+ Object partitionValue = input.get(partition);
if (partitionValue == null) {
- throw new IOException("Partition column '" + partition + "' not found in Avro record");
+ throw new IllegalArgumentException("Partition column '" + partition + "' not found in Avro record");
}
partitionValues.add(partitionValue.toString());
}
- } catch (IOException ioe) {
- // Add the failed record to the failure flow file
- log.error("Error writing record to Hive Streaming transaction", ioe);
- appendRecordsToFlowFile(session, Collections.singletonList(new HiveStreamingRecord(null, currRecord)),
- failureFlowFile, failureAvroWriter, reader);
+ }, onRecordError(context, session))) {
continue;
}
- List<Schema.Field> fields = currRecord.getSchema().getFields();
- if (fields != null) {
- JSONObject obj = new JSONObject();
- try {
- for (Schema.Field field : fields) {
- String fieldName = field.name();
- // Skip fields that are partition columns, we extracted those values above to create an EndPoint
- if (!partitionColumnList.contains(fieldName)) {
- Object value = currRecord.get(fieldName);
- try {
- obj.put(fieldName, value);
- } catch (JSONException je) {
- throw new IOException(je);
- }
- }
- }
- } catch (IOException ioe) {
- // This really shouldn't happen since we are iterating over the schema fields, but just in case,
- // add the failed record to the failure flow file.
- log.error("Error writing record to Hive Streaming transaction", ioe);
- appendRecordsToFlowFile(session, Collections.singletonList(new HiveStreamingRecord(null, currRecord)),
- failureFlowFile, failureAvroWriter, reader);
- continue;
- }
- final HiveStreamingRecord record = new HiveStreamingRecord(partitionValues, currRecord);
- HiveEndPoint endPoint = null;
- HiveWriter hiveWriter = null;
- try {
- endPoint = makeHiveEndPoint(record.getPartitionValues(), options);
- hiveWriter = getOrCreateWriter(endPoint);
- } catch (ConnectionError
- | HiveWriter.ConnectFailure
- | InterruptedException connectionError) {
- // Can't connect to Hive endpoint.
- log.error("Error connecting to Hive endpoint: table {} at {}",
- new Object[]{options.getTableName(), options.getMetaStoreURI()});
- // If we can't connect to the endpoint, exit the loop and let the outer exception handler route the original flow file to retry
- abortAndCloseWriters();
- throw new ProcessException(connectionError);
- }
- try {
- try {
- hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8));
- successfulRecords.add(record);
- } catch (InterruptedException | HiveWriter.WriteFailure wf) {
- // Add the failed record to the failure flow file
- log.error("Error writing record to Hive Streaming transaction", wf);
- appendRecordsToFlowFile(session, Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader);
- }
+ final HiveStreamingRecord record = new HiveStreamingRecord(partitionValues, currRecord);
+ final AtomicReference<HiveWriter> hiveWriterRef = new AtomicReference<>();
- // If we've reached the records-per-transaction limit, flush the Hive Writer and update the Avro Writer for successful records
- if (hiveWriter.getTotalRecords() >= recordsPerTxn) {
- hiveWriter.flush(true);
- // Now send the records to the success relationship and update the success count
- try {
- appendRecordsToFlowFile(session, successfulRecords, successFlowFile, successAvroWriter, reader);
- successfulRecordCount.accumulateAndGet(successfulRecords.size(), (current, incr) -> current + incr);
-
- // Clear the list of successful records, we'll use it at the end when we flush whatever records are left
- successfulRecords.clear();
-
- } catch (IOException ioe) {
- // The records were put to Hive Streaming successfully, but there was an error while writing the
- // Avro records to the flow file. Log as an error and move on.
- getLogger().error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file", ioe);
- }
- }
+ // Write record to Hive streaming
+ if (!exceptionHandler.execute(functionContext, record, input -> {
- } catch (InterruptedException
- | HiveWriter.CommitFailure
- | HiveWriter.TxnBatchFailure
- | HiveWriter.TxnFailure
- | SerializationError writeException) {
-
- log.error("Error writing record to Hive Streaming transaction", writeException);
- // Add the failed record to the failure flow file
- appendRecordsToFlowFile(session, Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader);
-
- if (!(writeException instanceof SerializationError)) {
- try {
- hiveWriter.abort();
- } catch (Exception e) {
- // Can't even abort properly, throw a process exception
- throw new ProcessException(e);
- }
+ final HiveEndPoint endPoint = makeHiveEndPoint(record.getPartitionValues(), options);
+ final HiveWriter hiveWriter = getOrCreateWriter(endPoint);
+ hiveWriterRef.set(hiveWriter);
+
+ hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8));
+ successfulRecords.get().add(record);
+
+ }, onHiveRecordError(context, session))) {
+ continue;
+ }
+
+ // If we've reached the records-per-transaction limit, flush the Hive Writer and update the Avro Writer for successful records
+ final HiveWriter hiveWriter = hiveWriterRef.get();
+ if (hiveWriter.getTotalRecords() >= recordsPerTxn) {
+ exceptionHandler.execute(functionContext, successfulRecords.get(), input -> {
+
+ hiveWriter.flush(true);
+ // Proceed function context. Process session can't be rollback anymore.
+ functionContext.proceed();
+
+ // Now send the records to the success relationship and update the success count
+ flushSuccessfulRecords.run();
+
+ }, onHiveRecordsError(context, session).andThen((fc, input, res, commitException) -> {
+ // Reset hiveWriter for succeeding records.
+ switch (res.destination()) {
+ case Retry:
+ case Failure:
+ try {
+ // Abort current tx and move to next.
+ hiveWriter.abort();
+ } catch (Exception e) {
+ // Can't even abort properly, throw a process exception
+ throw new ProcessException(e);
+ }
}
- }
+ }));
}
}
- try {
+
+ exceptionHandler.execute(functionContext, successfulRecords.get(), input -> {
// Finish any transactions
flushAllWriters(true);
closeAllWriters();
// Now send any remaining records to the success relationship and update the count
- appendRecordsToFlowFile(session, successfulRecords, successFlowFile, successAvroWriter, reader);
- successfulRecordCount.accumulateAndGet(successfulRecords.size(), (current, incr) -> current + incr);
- successfulRecords.clear();
-
- } catch (HiveWriter.CommitFailure
- | HiveWriter.TxnBatchFailure
- | HiveWriter.TxnFailure
- | InterruptedException e) {
-
- // If any records are in the successfulRecords list but ended up here, then they actually weren't transferred successfully, so
- // route them to failure instead
- appendRecordsToFlowFile(session, successfulRecords, failureFlowFile, failureAvroWriter, reader);
- }
+ flushSuccessfulRecords.run();
+
+ // Append successfulRecords on failure.
+ }, onHiveRecordsError(context, session));
+
} catch (IOException ioe) {
// The Avro file is invalid (or may not be an Avro file at all), send it to failure
- log.error("The incoming flow file can not be read as an Avro file, routing to failure", ioe);
- processingFailure.set(true);
- }
- });
-
- if (recordCount.get() > 0) {
- if (successfulRecordCount.get() > 0) {
- // Transfer the flow file with successful records
- successFlowFile.set(
- session.putAttribute(successFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(recordCount.get())));
- session.getProvenanceReporter().send(successFlowFile.get(), options.getMetaStoreURI());
- session.transfer(successFlowFile.get(), REL_SUCCESS);
- } else {
- session.remove(successFlowFile.get());
- }
+ final ErrorTypes.Result adjusted = adjustError.apply(functionContext, ErrorTypes.InvalidInput);
+ final String msg = "The incoming flow file can not be read as an Avro file";
+ switch (adjusted.destination()) {
+ case Failure:
+ log.error(msg, ioe);
+ result.routeTo(inputFlowFile, REL_FAILURE);
+ break;
+ case ProcessException:
+ throw new ProcessException(msg, ioe);
- if (recordCount.get() != successfulRecordCount.get()) {
- // There were some failed records, so transfer that flow file to failure
- failureFlowFile.set(
- session.putAttribute(failureFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR,
- Integer.toString(recordCount.get() - successfulRecordCount.get())));
- session.transfer(failureFlowFile.get(), REL_FAILURE);
- } else {
- session.remove(failureFlowFile.get());
+ }
}
- } else {
- // No records were processed, so remove the output flow files
- session.remove(successFlowFile.get());
- session.remove(failureFlowFile.get());
- }
- successFlowFile.set(null);
- failureFlowFile.set(null);
+ });
// If we got here, we've processed the outgoing flow files correctly, so remove the incoming one if necessary
- if (processingFailure.get()) {
- session.transfer(inputFlowFile, REL_FAILURE);
- } else {
- session.remove(flowFile);
+ if (result.getRoutedFlowFiles().values().stream().noneMatch(routed -> routed.contains(inputFlowFile))) {
+ session.remove(inputFlowFile);
}
- } catch (ProcessException pe) {
- abortAndCloseWriters();
- Throwable t = pe.getCause();
- if (t != null) {
- if (t instanceof ConnectionError
- || t instanceof HiveWriter.ConnectFailure
- || t instanceof HiveWriter.CommitFailure
- || t instanceof HiveWriter.TxnBatchFailure
- || t instanceof HiveWriter.TxnFailure
- || t instanceof InterruptedException) {
- log.error("Hive Streaming connect/write error, flow file will be penalized and routed to retry", t);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_RETRY);
- // Remove the ones we created
- if (successFlowFile.get() != null) {
- session.remove(successFlowFile.get());
- }
- if (failureFlowFile.get() != null) {
- session.remove(failureFlowFile.get());
- }
- } else {
- throw pe;
- }
- } else {
- throw pe;
- }
+ } catch (DiscontinuedException e) {
+ // The input FlowFile processing is discontinued. Keep it in the input queue.
+ getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
+ result.routeTo(flowFile, Relationship.SELF);
+
+ } catch (ShouldRetryException e) {
+ // This exception is already a result of adjusting an error, so simply transfer the FlowFile to retry.
+ getLogger().error(e.getMessage(), e);
+ flowFile = session.penalize(flowFile);
+ result.routeTo(flowFile, REL_RETRY);
+
} finally {
+ functionContext.transferFlowFiles(session, result, options.getMetaStoreURI());
// Restore original class loader, might not be necessary but is good practice since the processor task changed it
Thread.currentThread().setContextClassLoader(originalClassloader);
}
}
- private void appendRecordsToFlowFile(ProcessSession session,
- List<HiveStreamingRecord> records,
- AtomicReference<FlowFile> appendFlowFile,
- DataFileWriter<GenericRecord> avroWriter,
- DataFileStream<GenericRecord> reader) throws IOException {
-
- appendFlowFile.set(session.append(appendFlowFile.get(), (out) -> {
-
- try (DataFileWriter<GenericRecord> writer = avroWriter.create(reader.getSchema(), out)) {
- for (HiveStreamingRecord sRecord : records) {
- writer.append(sRecord.getRecord());
- }
- writer.flush();
- }
- }));
- }
-
@OnStopped
public void cleanup() {
ComponentLog log = getLogger();
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
index 342fada..e61fa9f 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
@@ -50,11 +50,13 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.PartialFunctions;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.hive.CsvOutputOptions;
import org.apache.nifi.util.hive.HiveJdbcCommon;
@@ -209,7 +211,11 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
}
@Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+ PartialFunctions.onTrigger(context, sessionFactory, getLogger(), session -> onTrigger(context, session));
+ }
+
+ private void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile fileToProcess = (context.hasIncomingConnection()? session.get():null);
FlowFile flowfile = null;
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
index c7498f9..5624f79 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java
@@ -19,6 +19,7 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.dbcp.hive.HiveDBCPService;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -42,6 +43,7 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class TestPutHiveQL {
private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
@@ -128,6 +130,91 @@ public class TestPutHiveQL {
runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3);
}
+ @Test
+ public void testFailInMiddleWithBadStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersonsAutoId);
+ }
+ }
+
+ runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+ runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes());
+ runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax
+ runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes());
+ runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes());
+ runner.run();
+
+ // The 1st one should be routed to success, others should stay in queue.
+ assertEquals(3, runner.getQueueSize().getObjectCount());
+ runner.assertTransferCount(PutHiveQL.REL_FAILURE, 0);
+ runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testFailAtBeginning() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersonsAutoId);
+ }
+ }
+
+ runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+ runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax
+ runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes());
+ runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes());
+ runner.run();
+
+ runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1);
+ runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 2);
+ }
+
+ @Test
+ public void testFailAtBeginningRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersonsAutoId);
+ }
+ }
+
+ runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+ runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax
+ runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes());
+ runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes());
+ try {
+ runner.run();
+ fail("ProcessException should be thrown");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ assertEquals(3, runner.getQueueSize().getObjectCount());
+ runner.assertTransferCount(PutHiveQL.REL_FAILURE, 0);
+ runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 0);
+ }
@Test
public void testFailInMiddleWithBadParameterType() throws InitializationException, ProcessException, SQLException, IOException {
@@ -189,7 +276,56 @@ public class TestPutHiveQL {
final Map<String, String> badAttributes = new HashMap<>();
badAttributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
- badAttributes.put("hiveql.args.1.value", "9999");
+ badAttributes.put("hiveql.args.1.value", "101"); // Constraint violation, up to 100
+
+ final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes();
+ runner.enqueue(data, goodAttributes);
+ runner.enqueue(data, badAttributes);
+ runner.enqueue(data, goodAttributes);
+ runner.enqueue(data, goodAttributes);
+ runner.run();
+
+ runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3);
+ runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("Mark", rs.getString(2));
+ assertEquals(84, rs.getInt(3));
+ assertTrue(rs.next());
+ assertTrue(rs.next());
+ assertFalse(rs.next());
+ }
+ }
+ }
+
+ @Test
+ public void testFailInMiddleWithBadNumberFormat() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate(createPersonsAutoId);
+ }
+ }
+
+ runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+
+ final Map<String, String> goodAttributes = new HashMap<>();
+ goodAttributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+ goodAttributes.put("hiveql.args.1.value", "84");
+
+ final Map<String, String> badAttributes = new HashMap<>();
+ badAttributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+ badAttributes.put("hiveql.args.1.value", "NOT_NUMBER");
final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes();
runner.enqueue(data, goodAttributes);
@@ -540,6 +676,44 @@ public class TestPutHiveQL {
runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 1);
}
+ @Test
+ public void testRetryableFailureRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
+ final DBCPService service = new SQLExceptionService(null);
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
+ runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
+
+ final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
+ "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
+ attributes.put("hiveql.args.1.value", "1");
+
+ attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
+ attributes.put("hiveql.args.2.value", "Mark");
+
+ attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
+ attributes.put("hiveql.args.3.value", "84");
+
+ attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER));
+ attributes.put("hiveql.args.4.value", "1");
+
+ runner.enqueue(sql.getBytes(), attributes);
+ try {
+ runner.run();
+ fail("Should throw ProcessException");
+ } catch (AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ }
+
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 0);
+ }
+
/**
* Simple implementation only for testing purposes
*/
@@ -607,7 +781,7 @@ public class TestPutHiveQL {
@Override
public String getConnectionURL() {
- return service.getConnectionURL();
+ return service != null ? service.getConnectionURL() : null;
}
}
}
[2/5] nifi git commit: NIFI-3415: Add Rollback on Failure.
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index fd414c4..0f65014 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -31,13 +31,19 @@ import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.processor.util.pattern.ErrorTypes;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler;
+import org.apache.nifi.processor.util.pattern.PartialFunctions;
+import org.apache.nifi.processor.util.pattern.Put;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processor.util.pattern.RoutingResult;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@@ -53,7 +59,9 @@ import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
+import java.sql.SQLDataException;
import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLNonTransientException;
import java.sql.Statement;
import java.util.ArrayList;
@@ -66,7 +74,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.IntStream;
+
+import static java.lang.String.format;
@EventDriven
@@ -83,7 +92,7 @@ import java.util.stream.IntStream;
+ "will be used to determine the type of statement (INSERT, UPDATE, DELETE, SQL, etc.) to generate and execute.")
@WritesAttribute(attribute = PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR, description = "If an error occurs during processing, the flow file will be routed to failure or retry, and this attribute "
+ "will be populated with the cause of the error.")
-public class PutDatabaseRecord extends AbstractProcessor {
+public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
static final String UPDATE_TYPE = "UPDATE";
static final String INSERT_TYPE = "INSERT";
@@ -289,10 +298,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
pds.add(QUOTED_IDENTIFIERS);
pds.add(QUOTED_TABLE_IDENTIFIER);
pds.add(QUERY_TIMEOUT);
+ pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
propDescriptors = Collections.unmodifiableList(pds);
}
+ private Put<FunctionContext, Connection> process;
+ private ExceptionHandler<FunctionContext> exceptionHandler;
@Override
public Set<Relationship> getRelationships() {
@@ -316,318 +328,356 @@ public class PutDatabaseRecord extends AbstractProcessor {
.build();
}
+ private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc) -> {
+ final Connection connection = c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class).getConnection();
+ try {
+ fc.originalAutoCommit = connection.getAutoCommit();
+ connection.setAutoCommit(false);
+
+ String jdbcUrl = "DBCPService";
+ try {
+ DatabaseMetaData databaseMetaData = connection.getMetaData();
+ if (databaseMetaData != null) {
+ jdbcUrl = databaseMetaData.getURL();
+ }
+ } catch (SQLException se) {
+ // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
+ } finally {
+ fc.jdbcUrl = jdbcUrl;
+ }
+
+ } catch (SQLException e) {
+ throw new ProcessException("Failed to disable auto commit due to " + e, e);
+ }
+ return connection;
+ };
+
+ private final Put.PutFlowFile<FunctionContext, Connection> putFlowFile = (context, session, functionContext, conn, flowFile, result) -> {
+
+ exceptionHandler.execute(functionContext, flowFile, inputFlowFile -> {
+
+ // Get the statement type from the attribute if necessary
+ final String statementTypeProperty = context.getProperty(STATEMENT_TYPE).getValue();
+ String statementType = statementTypeProperty;
+ if (USE_ATTR_TYPE.equals(statementTypeProperty)) {
+ statementType = inputFlowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
+ }
+ if (StringUtils.isEmpty(statementType)) {
+ final String msg = format("Statement Type is not specified, FlowFile %s", inputFlowFile);
+ throw new IllegalArgumentException(msg);
+ }
+
+
+ try (final InputStream in = session.read(inputFlowFile)) {
+
+ final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
+ .asControllerService(RecordReaderFactory.class);
+ final RecordReader recordParser = recordParserFactory.createRecordReader(inputFlowFile, in, getLogger());
+
+ if (SQL_TYPE.equalsIgnoreCase(statementType)) {
+ executeSQL(context, session, inputFlowFile, functionContext, result, conn, recordParser);
+
+ } else {
+ final DMLSettings settings = new DMLSettings(context);
+ executeDML(context, session, inputFlowFile, functionContext, result, conn, recordParser, statementType, settings);
+ }
+ }
+
+ }, (fc, inputFlowFile, r, e) -> {
+
+ getLogger().warn("Failed to process {} due to {}", new Object[]{inputFlowFile, e}, e);
+
+ if (e instanceof BatchUpdateException) {
+ try {
+ // Although process session will move forward in order to route the failed FlowFile,
+ // database transaction should be rolled back to avoid partial batch update.
+ conn.rollback();
+ } catch (SQLException re) {
+ getLogger().error("Failed to rollback database due to {}, transaction may be incomplete.", new Object[]{re}, re);
+ }
+ }
+
+ // Embed Exception detail to FlowFile attribute then delegate error handling to default and rollbackOnFailure.
+ final FlowFile flowFileWithAttributes = session.putAttribute(inputFlowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
+ final ExceptionHandler.OnError<FunctionContext, FlowFile> defaultOnError = ExceptionHandler.createOnError(context, session, result, REL_FAILURE, REL_RETRY);
+ final ExceptionHandler.OnError<FunctionContext, FlowFile> rollbackOnFailure = RollbackOnFailure.createOnError(defaultOnError);
+ rollbackOnFailure.apply(fc, flowFileWithAttributes, r, e);
+ });
+ };
+
+
@OnScheduled
public void onScheduled(final ProcessContext context) {
synchronized (this) {
schemaCache.clear();
}
+
+ process = new Put<>();
+
+ process.setLogger(getLogger());
+ process.initConnection(initConnection);
+ process.putFlowFile(putFlowFile);
+ process.adjustRoute(RollbackOnFailure.createAdjustRoute(REL_FAILURE, REL_RETRY));
+
+ process.onCompleted((c, s, fc, conn) -> {
+ try {
+ conn.commit();
+ } catch (SQLException e) {
+ // Throw ProcessException to rollback process session.
+ throw new ProcessException("Failed to commit database connection due to " + e, e);
+ }
+ });
+
+ process.onFailed((c, s, fc, conn, e) -> {
+ try {
+ conn.rollback();
+ } catch (SQLException re) {
+ // Just log the fact that rollback failed.
+ // ProcessSession will be rollback by the thrown Exception so don't have to do anything here.
+ getLogger().warn("Failed to rollback database connection due to %s", new Object[]{re}, re);
+ }
+ });
+
+ process.cleanup((c, s, fc, conn) -> {
+ // make sure that we try to set the auto commit back to whatever it was.
+ if (fc.originalAutoCommit) {
+ try {
+ conn.setAutoCommit(true);
+ } catch (final SQLException se) {
+ getLogger().warn("Failed to reset autocommit due to {}", new Object[]{se});
+ }
+ }
+ });
+
+ exceptionHandler = new ExceptionHandler<>();
+ exceptionHandler.mapException(s -> {
+
+ try {
+ if (s == null) {
+ return ErrorTypes.PersistentFailure;
+ }
+ throw s;
+
+ } catch (IllegalArgumentException
+ |MalformedRecordException
+ |SQLNonTransientException e) {
+ return ErrorTypes.InvalidInput;
+
+ } catch (IOException
+ |SQLException e) {
+ return ErrorTypes.TemporalFailure;
+
+ } catch (Exception e) {
+ return ErrorTypes.UnknownFailure;
+ }
+
+ });
+ exceptionHandler.adjustError(RollbackOnFailure.createAdjustError(getLogger()));
}
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ private static class FunctionContext extends RollbackOnFailure {
+ private final int queryTimeout;
+ private boolean originalAutoCommit = false;
+ private String jdbcUrl;
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
+ public FunctionContext(boolean rollbackOnFailure, int queryTimeout) {
+ super(rollbackOnFailure, true);
+ this.queryTimeout = queryTimeout;
}
+ }
- final ComponentLog log = getLogger();
-
- final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
- .asControllerService(RecordReaderFactory.class);
- final String statementTypeProperty = context.getProperty(STATEMENT_TYPE).getValue();
- final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
- final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
- final boolean ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
- final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
+ static class DMLSettings {
+ private final boolean translateFieldNames;
+ private final boolean ignoreUnmappedFields;
// Is the unmatched column behaviour fail or warning?
- final boolean failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
- final boolean warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+ private final boolean failUnmappedColumns;
+ private final boolean warningUnmappedColumns;
// Escape column names?
- final boolean escapeColumnNames = context.getProperty(QUOTED_IDENTIFIERS).asBoolean();
+ private final boolean escapeColumnNames;
// Quote table name?
- final boolean quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
+ private final boolean quoteTableName;
+
+ private DMLSettings(ProcessContext context) {
+ translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
+ ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
+
+ failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+ warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+
+ escapeColumnNames = context.getProperty(QUOTED_IDENTIFIERS).asBoolean();
+ quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
+ }
+
+ }
- try (final Connection con = dbcpService.getConnection()) {
+ private void executeSQL(ProcessContext context, ProcessSession session,
+ FlowFile flowFile, FunctionContext functionContext, RoutingResult result,
+ Connection con, RecordReader recordParser)
+ throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
+
+ final RecordSchema recordSchema = recordParser.getSchema();
+
+ // Find which field has the SQL statement in it
+ final String sqlField = context.getProperty(FIELD_CONTAINING_SQL).evaluateAttributeExpressions(flowFile).getValue();
+ if (StringUtils.isEmpty(sqlField)) {
+ throw new IllegalArgumentException(format("SQL specified as Statement Type but no Field Containing SQL was found, FlowFile %s", flowFile));
+ }
+
+ boolean schemaHasSqlField = recordSchema.getFields().stream().anyMatch((field) -> sqlField.equals(field.getFieldName()));
+ if (!schemaHasSqlField) {
+ throw new IllegalArgumentException(format("Record schema does not contain Field Containing SQL: %s, FlowFile %s", sqlField, flowFile));
+ }
+
+ try (Statement s = con.createStatement()) {
- final boolean originalAutoCommit = con.getAutoCommit();
try {
- con.setAutoCommit(false);
+ s.setQueryTimeout(functionContext.queryTimeout); // timeout in seconds
+ } catch (SQLException se) {
+ // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
+ if (functionContext.queryTimeout > 0) {
+ throw se;
+ }
+ }
- String jdbcURL = "DBCPService";
- try {
- DatabaseMetaData databaseMetaData = con.getMetaData();
- if (databaseMetaData != null) {
- jdbcURL = databaseMetaData.getURL();
- }
- } catch (SQLException se) {
- // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
+ Record currentRecord;
+ while ((currentRecord = recordParser.nextRecord()) != null) {
+ Object sql = currentRecord.getValue(sqlField);
+ if (sql == null || StringUtils.isEmpty((String) sql)) {
+ throw new MalformedRecordException(format("Record had no (or null) value for Field Containing SQL: %s, FlowFile %s", sqlField, flowFile));
}
- final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
- final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
- final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
- final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
- final SchemaKey schemaKey = new SchemaKey(catalog, schemaName, tableName);
+ // Execute the statement as-is
+ s.execute((String) sql);
+ }
+ result.routeTo(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().send(flowFile, functionContext.jdbcUrl);
+ }
+ }
- // Get the statement type from the attribute if necessary
- String statementType = statementTypeProperty;
- if (USE_ATTR_TYPE.equals(statementTypeProperty)) {
- statementType = flowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
- }
- if (StringUtils.isEmpty(statementType)) {
- log.error("Statement Type is not specified, flowfile {} will be penalized and routed to failure", new Object[]{flowFile});
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Statement Type not specified");
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- } else {
- RecordSchema recordSchema;
- try (final InputStream in = session.read(flowFile)) {
-
- final RecordReader recordParser = recordParserFactory.createRecordReader(flowFile, in, log);
- recordSchema = recordParser.getSchema();
-
- if (SQL_TYPE.equalsIgnoreCase(statementType)) {
-
- // Find which field has the SQL statement in it
- final String sqlField = context.getProperty(FIELD_CONTAINING_SQL).evaluateAttributeExpressions(flowFile).getValue();
- if (StringUtils.isEmpty(sqlField)) {
- log.error("SQL specified as Statement Type but no Field Containing SQL was found, flowfile {} will be penalized and routed to failure", new Object[]{flowFile});
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Field Containing SQL not found");
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- } else {
- boolean schemaHasSqlField = recordSchema.getFields().stream().anyMatch((field) -> sqlField.equals(field.getFieldName()));
- if (schemaHasSqlField) {
- try (Statement s = con.createStatement()) {
-
- try {
- s.setQueryTimeout(queryTimeout); // timeout in seconds
- } catch (SQLException se) {
- // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
- if (queryTimeout > 0) {
- throw se;
- }
- }
-
- Record currentRecord;
- while ((currentRecord = recordParser.nextRecord()) != null) {
- Object sql = currentRecord.getValue(sqlField);
- if (sql != null && !StringUtils.isEmpty((String) sql)) {
- // Execute the statement as-is
- s.execute((String) sql);
- } else {
- log.error("Record had no (or null) value for Field Containing SQL: {}, flowfile {} will be penalized and routed to failure",
- new Object[]{sqlField, flowFile});
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Field Containing SQL missing value");
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- }
- session.transfer(flowFile, REL_SUCCESS);
- session.getProvenanceReporter().send(flowFile, jdbcURL);
- } catch (final SQLNonTransientException e) {
- log.error("Failed to update database for {} due to {}; rolling back database and routing to failure", new Object[]{flowFile, e}, e);
- try {
- con.rollback();
- } catch (SQLException se) {
- log.error("Failed to rollback database, transaction may be incomplete.", se);
- }
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- } catch (final SQLException e) {
- log.error("Failed to update database for {} due to {}; rolling back database. It is possible that retrying the operation will succeed, so routing to retry",
- new Object[]{flowFile, e}, e);
- try {
- con.rollback();
- } catch (SQLException se) {
- log.error("Failed to rollback database, transaction may be incomplete.", se);
- }
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_RETRY);
- }
- } else {
- log.error("Record schema does not contain Field Containing SQL: {}, flowfile {} will be penalized and routed to failure", new Object[]{sqlField, flowFile});
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Record schema missing Field Containing SQL value");
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- }
- }
+ private void executeDML(ProcessContext context, ProcessSession session, FlowFile flowFile,
+ FunctionContext functionContext, RoutingResult result, Connection con,
+ RecordReader recordParser, String statementType, DMLSettings settings)
+ throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
- } else {
- // Ensure the table name has been set, the generated SQL statements (and TableSchema cache) will need it
- if (StringUtils.isEmpty(tableName)) {
- log.error("Cannot process {} because Table Name is null or empty; penalizing and routing to failure", new Object[]{flowFile});
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Table Name missing");
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
-
- // Always get the primary keys if Update Keys is empty. Otherwise if we have an Insert statement first, the table will be
- // cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available
- final boolean includePrimaryKeys = (updateKeys == null);
-
- // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
- // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
- // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
- // Java Heap if there are a lot of different SQL statements being generated that reference different tables.
- TableSchema schema;
- synchronized (this) {
- schema = schemaCache.get(schemaKey);
- if (schema == null) {
- // No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
- try (final Connection conn = dbcpService.getConnection()) {
- schema = TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys);
- schemaCache.put(schemaKey, schema);
- } catch (final SQLNonTransientException e) {
- log.error("Failed to update database for {} due to {}; routing to failure", new Object[]{flowFile, e}, e);
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- } catch (final SQLException e) {
- log.error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
- new Object[]{flowFile, e}, e);
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_RETRY);
- return;
- }
- }
- }
-
- final SqlAndIncludedColumns sqlHolder;
- try {
- // build the fully qualified table name
- final StringBuilder tableNameBuilder = new StringBuilder();
- if (catalog != null) {
- tableNameBuilder.append(catalog).append(".");
- }
- if (schemaName != null) {
- tableNameBuilder.append(schemaName).append(".");
- }
- tableNameBuilder.append(tableName);
- final String fqTableName = tableNameBuilder.toString();
-
- if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
- sqlHolder = generateInsert(recordSchema, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
- failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
- } else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) {
- sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields,
- failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
- } else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
- sqlHolder = generateDelete(recordSchema, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
- failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
- } else {
- log.error("Statement Type {} is not valid, flowfile {} will be penalized and routed to failure", new Object[]{statementType, flowFile});
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Statement Type invalid");
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- } catch (final ProcessException pe) {
- log.error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
- new Object[]{flowFile, statementType, pe.toString()}, pe);
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, pe.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
-
- try (PreparedStatement ps = con.prepareStatement(sqlHolder.getSql())) {
-
- try {
- ps.setQueryTimeout(queryTimeout); // timeout in seconds
- } catch (SQLException se) {
- // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
- if (queryTimeout > 0) {
- throw se;
- }
- }
-
- Record currentRecord;
- List<Integer> fieldIndexes = sqlHolder.getFieldIndexes();
-
- while ((currentRecord = recordParser.nextRecord()) != null) {
- Object[] values = currentRecord.getValues();
- if (values != null) {
- if (fieldIndexes != null) {
- for (int i = 0; i < fieldIndexes.size(); i++) {
- ps.setObject(i + 1, values[fieldIndexes.get(i)]);
- }
- } else {
- // If there's no index map, assume all values are included and set them in order
- for (int i = 0; i < values.length; i++) {
- ps.setObject(i + 1, values[i]);
- }
- }
- ps.addBatch();
- }
- }
-
- log.debug("Executing query {}", new Object[]{sqlHolder});
- ps.executeBatch();
- session.transfer(flowFile, REL_SUCCESS);
- session.getProvenanceReporter().send(flowFile, jdbcURL);
-
- } catch (final SQLNonTransientException | BatchUpdateException e) {
- log.error("Failed to update database for {} due to {}; rolling back database, routing to failure", new Object[]{flowFile, e}, e);
- try {
- con.rollback();
- } catch (SQLException se) {
- log.error("Failed to rollback database, transaction may be incomplete.", se);
- }
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- } catch (final SQLException e) {
- log.error("Failed to update database for {} due to {}; rolling back database. It is possible that retrying the operation will succeed, so routing to retry",
- new Object[]{flowFile, e}, e);
- try {
- con.rollback();
- } catch (SQLException se) {
- log.error("Failed to rollback database, transaction may be incomplete.", se);
- }
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_RETRY);
- }
- }
- } catch (final MalformedRecordException | SchemaNotFoundException | IOException e) {
- log.error("Failed to determine schema of data records for {}, routing to failure", new Object[]{flowFile}, e);
+ final RecordSchema recordSchema = recordParser.getSchema();
+ final ComponentLog log = getLogger();
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- }
+ final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
+ final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName);
+
+ // Ensure the table name has been set, the generated SQL statements (and TableSchema cache) will need it
+ if (StringUtils.isEmpty(tableName)) {
+ throw new IllegalArgumentException(format("Cannot process %s because Table Name is null or empty", flowFile));
+ }
+
+ // Always get the primary keys if Update Keys is empty. Otherwise if we have an Insert statement first, the table will be
+ // cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available
+ final boolean includePrimaryKeys = updateKeys == null;
+
+ // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
+ // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
+ // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
+ // Java Heap if there are a lot of different SQL statements being generated that reference different tables.
+ TableSchema tableSchema;
+ synchronized (this) {
+ tableSchema = schemaCache.get(schemaKey);
+ if (tableSchema == null) {
+ // No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
+ tableSchema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys);
+ schemaCache.put(schemaKey, tableSchema);
+ }
+ }
+ if (tableSchema == null) {
+ throw new IllegalArgumentException("No table schema specified!");
+ }
+
+ // build the fully qualified table name
+ final StringBuilder tableNameBuilder = new StringBuilder();
+ if (catalog != null) {
+ tableNameBuilder.append(catalog).append(".");
+ }
+ if (schemaName != null) {
+ tableNameBuilder.append(schemaName).append(".");
+ }
+ tableNameBuilder.append(tableName);
+ final String fqTableName = tableNameBuilder.toString();
+
+ if (recordSchema == null) {
+ throw new IllegalArgumentException("No record schema specified!");
+ }
+
+ final SqlAndIncludedColumns sqlHolder;
+ if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
+ sqlHolder = generateInsert(recordSchema, fqTableName, tableSchema, settings);
+
+ } else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) {
+ sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings);
+
+ } else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
+ sqlHolder = generateDelete(recordSchema, fqTableName, tableSchema, settings);
+
+ } else {
+ throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile));
+ }
+
+ try (PreparedStatement ps = con.prepareStatement(sqlHolder.getSql())) {
+
+ final int queryTimeout = functionContext.queryTimeout;
+ try {
+ ps.setQueryTimeout(queryTimeout); // timeout in seconds
+ } catch (SQLException se) {
+ // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
+ if (queryTimeout > 0) {
+ throw se;
}
- } finally {
- try {
- con.commit();
- } finally {
- // make sure that we try to set the auto commit back to whatever it was.
- if (originalAutoCommit) {
- try {
- con.setAutoCommit(originalAutoCommit);
- } catch (final SQLException se) {
- // Nothing to do if it didn't work, indicates an issue with the driver
+ }
+
+ Record currentRecord;
+ List<Integer> fieldIndexes = sqlHolder.getFieldIndexes();
+
+ while ((currentRecord = recordParser.nextRecord()) != null) {
+ Object[] values = currentRecord.getValues();
+ if (values != null) {
+ if (fieldIndexes != null) {
+ for (int i = 0; i < fieldIndexes.size(); i++) {
+ ps.setObject(i + 1, values[fieldIndexes.get(i)]);
+ }
+ } else {
+ // If there's no index map, assume all values are included and set them in order
+ for (int i = 0; i < values.length; i++) {
+ ps.setObject(i + 1, values[i]);
}
}
+ ps.addBatch();
}
}
- } catch (final ProcessException | SQLException e) {
- log.error("Error occurred during processing, yielding the processor", e);
- context.yield();
+
+ log.debug("Executing query {}", new Object[]{sqlHolder});
+ ps.executeBatch();
+ result.routeTo(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().send(flowFile, functionContext.jdbcUrl);
+
}
}
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+
+ final Boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
+ final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
+
+ final FunctionContext functionContext = new FunctionContext(rollbackOnFailure, queryTimeout);
+
+ RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, getLogger(), session -> process.onTrigger(context, session, functionContext));
+
+ }
+
private Set<String> getNormalizedColumnNames(final RecordSchema schema, final boolean translateFieldNames) {
final Set<String> normalizedFieldNames = new HashSet<>();
if (schema != null) {
@@ -636,27 +686,19 @@ public class PutDatabaseRecord extends AbstractProcessor {
return normalizedFieldNames;
}
- SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema, final String tableName,
- final TableSchema tableSchema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
- final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
-
- if (recordSchema == null) {
- throw new ProcessException("No table schema specified!");
- }
- if (tableSchema == null) {
- throw new ProcessException("No table schema specified!");
- }
+ SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema, final String tableName, final TableSchema tableSchema, final DMLSettings settings)
+ throws IllegalArgumentException, SQLException {
- final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, translateFieldNames);
+ final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
for (final String requiredColName : tableSchema.getRequiredColumnNames()) {
- final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
+ final String normalizedColName = normalizeColumnName(requiredColName, settings.translateFieldNames);
if (!normalizedFieldNames.contains(normalizedColName)) {
String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'";
- if (failUnmappedColumns) {
+ if (settings.failUnmappedColumns) {
getLogger().error(missingColMessage);
- throw new ProcessException(missingColMessage);
- } else if (warningUnmappedColumns) {
+ throw new IllegalArgumentException(missingColMessage);
+ } else if (settings.warningUnmappedColumns) {
getLogger().warn(missingColMessage);
}
}
@@ -664,7 +706,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("INSERT INTO ");
- if (quoteTableName) {
+ if (settings.quoteTableName) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(tableName)
.append(tableSchema.getQuotedIdentifierString());
@@ -680,14 +722,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
int fieldCount = fieldNames.size();
AtomicInteger fieldsFound = new AtomicInteger(0);
- IntStream.range(0, fieldCount).forEach((i) -> {
-
+ for (int i = 0; i < fieldCount; i++) {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
- final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
- if (desc == null && !ignoreUnmappedFields) {
- throw new ProcessException("Cannot map field '" + fieldName + "' to any column in the database");
+ final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
+ if (desc == null && !settings.ignoreUnmappedFields) {
+ throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
}
if (desc != null) {
@@ -695,7 +736,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(", ");
}
- if (escapeColumnNames) {
+ if (settings.escapeColumnNames) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(desc.getColumnName())
.append(tableSchema.getQuotedIdentifierString());
@@ -704,7 +745,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
includedColumns.add(i);
}
- });
+ }
// complete the SQL statements by adding ?'s for all of the values to be escaped.
sqlBuilder.append(") VALUES (");
@@ -712,22 +753,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(")");
if (fieldsFound.get() == 0) {
- throw new ProcessException("None of the fields in the record map to the columns defined by the " + tableName + " table");
+ throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table");
}
}
return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
}
SqlAndIncludedColumns generateUpdate(final RecordSchema recordSchema, final String tableName, final String updateKeys,
- final TableSchema tableSchema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
- final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
-
- if (recordSchema == null) {
- throw new ProcessException("No table schema specified!");
- }
- if (tableSchema == null) {
- throw new ProcessException("No table schema specified!");
- }
+ final TableSchema tableSchema, final DMLSettings settings)
+ throws IllegalArgumentException, MalformedRecordException, SQLException {
final Set<String> updateKeyNames;
if (updateKeys == null) {
@@ -740,12 +774,12 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
if (updateKeyNames.isEmpty()) {
- throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
+ throw new SQLIntegrityConstraintViolationException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
}
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("UPDATE ");
- if (quoteTableName) {
+ if (settings.quoteTableName) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(tableName)
.append(tableSchema.getQuotedIdentifierString());
@@ -755,18 +789,18 @@ public class PutDatabaseRecord extends AbstractProcessor {
// Create a Set of all normalized Update Key names, and ensure that there is a field in the record
// for each of the Update Key fields.
- final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, translateFieldNames);
+ final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
final Set<String> normalizedUpdateNames = new HashSet<>();
for (final String uk : updateKeyNames) {
- final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
+ final String normalizedUK = normalizeColumnName(uk, settings.translateFieldNames);
normalizedUpdateNames.add(normalizedUK);
if (!normalizedFieldNames.contains(normalizedUK)) {
String missingColMessage = "Record does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'";
- if (failUnmappedColumns) {
+ if (settings.failUnmappedColumns) {
getLogger().error(missingColMessage);
- throw new ProcessException(missingColMessage);
- } else if (warningUnmappedColumns) {
+ throw new MalformedRecordException(missingColMessage);
+ } else if (settings.warningUnmappedColumns) {
getLogger().warn(missingColMessage);
}
}
@@ -781,18 +815,17 @@ public class PutDatabaseRecord extends AbstractProcessor {
int fieldCount = fieldNames.size();
AtomicInteger fieldsFound = new AtomicInteger(0);
- IntStream.range(0, fieldCount).forEach((i) -> {
-
+ for (int i = 0; i < fieldCount; i++) {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
- final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
- final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
+ final String normalizedColName = normalizeColumnName(fieldName, settings.translateFieldNames);
+ final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
if (desc == null) {
- if (!ignoreUnmappedFields) {
- throw new ProcessException("Cannot map field '" + fieldName + "' to any column in the database");
+ if (!settings.ignoreUnmappedFields) {
+ throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
} else {
- return;
+ continue;
}
}
@@ -803,7 +836,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(", ");
}
- if (escapeColumnNames) {
+ if (settings.escapeColumnNames) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(desc.getColumnName())
.append(tableSchema.getQuotedIdentifierString());
@@ -814,19 +847,19 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(" = ?");
includedColumns.add(i);
}
- });
+ }
// Set the WHERE clause based on the Update Key values
sqlBuilder.append(" WHERE ");
AtomicInteger whereFieldCount = new AtomicInteger(0);
- IntStream.range(0, fieldCount).forEach((i) -> {
+ for (int i = 0; i < fieldCount; i++) {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
- final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
- final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
+ final String normalizedColName = normalizeColumnName(fieldName, settings.translateFieldNames);
+ final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
if (desc != null) {
// Check if this column is a Update Key. If so, add it to the WHERE clause
@@ -836,7 +869,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(" AND ");
}
- if (escapeColumnNames) {
+ if (settings.escapeColumnNames) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(normalizedColName)
.append(tableSchema.getQuotedIdentifierString());
@@ -847,31 +880,23 @@ public class PutDatabaseRecord extends AbstractProcessor {
includedColumns.add(i);
}
}
- });
+ }
}
return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
}
- SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName,
- final TableSchema tableSchema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
- final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
+ SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName, final TableSchema tableSchema, final DMLSettings settings)
+ throws IllegalArgumentException, MalformedRecordException, SQLDataException {
- if (recordSchema == null) {
- throw new ProcessException("No table schema specified!");
- }
- if (tableSchema == null) {
- throw new ProcessException("No table schema specified!");
- }
-
- final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, translateFieldNames);
+ final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
for (final String requiredColName : tableSchema.getRequiredColumnNames()) {
- final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
+ final String normalizedColName = normalizeColumnName(requiredColName, settings.translateFieldNames);
if (!normalizedFieldNames.contains(normalizedColName)) {
String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'";
- if (failUnmappedColumns) {
+ if (settings.failUnmappedColumns) {
getLogger().error(missingColMessage);
- throw new ProcessException(missingColMessage);
- } else if (warningUnmappedColumns) {
+ throw new MalformedRecordException(missingColMessage);
+ } else if (settings.warningUnmappedColumns) {
getLogger().warn(missingColMessage);
}
}
@@ -879,7 +904,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("DELETE FROM ");
- if (quoteTableName) {
+ if (settings.quoteTableName) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(tableName)
.append(tableSchema.getQuotedIdentifierString());
@@ -895,14 +920,14 @@ public class PutDatabaseRecord extends AbstractProcessor {
int fieldCount = fieldNames.size();
AtomicInteger fieldsFound = new AtomicInteger(0);
- IntStream.range(0, fieldCount).forEach((i) -> {
+ for (int i = 0; i < fieldCount; i++) {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
- final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
- if (desc == null && !ignoreUnmappedFields) {
- throw new ProcessException("Cannot map field '" + fieldName + "' to any column in the database");
+ final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
+ if (desc == null && !settings.ignoreUnmappedFields) {
+ throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
}
if (desc != null) {
@@ -910,7 +935,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(" AND ");
}
- if (escapeColumnNames) {
+ if (settings.escapeColumnNames) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(desc.getColumnName())
.append(tableSchema.getQuotedIdentifierString());
@@ -921,10 +946,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
includedColumns.add(i);
}
- });
+ }
if (fieldsFound.get() == 0) {
- throw new ProcessException("None of the fields in the record map to the columns defined by the " + tableName + " table");
+ throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table");
}
}
@@ -985,7 +1010,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final Set<String> primaryKeyColumns = new HashSet<>();
if (includePrimaryKeys) {
- try (final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName)) {
+ try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, null, tableName)) {
while (pkrs.next()) {
final String colName = pkrs.getString("COLUMN_NAME");
[5/5] nifi git commit: NIFI-3415: Add Rollback on Failure.
Posted by ma...@apache.org.
NIFI-3415: Add Rollback on Failure.
- Added org.apache.nifi.processor.util.pattern package in nifi-processor-utils containing reusable functions to mix-in 'Rollback on Failure' capability.
- Created a process pattern classes, Put and PutGroup. It will be helpful to standardize Processor implementations.
- Applied Rollback on Failure to PutSQL, PutHiveQL, PutHiveStreaming and PutDatabaseRecord.
- Stop using AbstractProcessor for these processors, as it penalizes FlowFiles being processed when it rollback a process session. If FlowFiles are penalized, it will not be fetched again until penalization expires.
- Yield processor when a failure occurs and RollbackOnFailure is enabled. If we do not penalize nor yield, a failed FlowFile retries too frequently.
- When Rollback on Failure is enabled but processor is not transactional, discontinue when an error occurred after successful processes.
- Fixed existing issues on PutHiveStreaming:
- Output FlowFile Avro format was corrupted by concatenating multiple Avro files.
- Output FlowFile records had incorrect values because of reusing GenericRecord instance.
Signed-off-by: Matt Burgess <ma...@apache.org>
This closes #1658
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d9acdb54
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d9acdb54
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d9acdb54
Branch: refs/heads/master
Commit: d9acdb54bec96695837f8fcde54c58403aa46f29
Parents: a1bffbc
Author: Koji Kawamura <ij...@apache.org>
Authored: Thu Mar 2 09:51:12 2017 +0900
Committer: Matt Burgess <ma...@apache.org>
Committed: Thu Apr 27 13:44:56 2017 -0400
----------------------------------------------------------------------
nifi-commons/nifi-processor-utilities/pom.xml | 10 +
.../util/pattern/DiscontinuedException.java | 31 +
.../nifi/processor/util/pattern/ErrorTypes.java | 148 ++++
.../util/pattern/ExceptionHandler.java | 235 ++++++
.../util/pattern/PartialFunctions.java | 122 +++
.../apache/nifi/processor/util/pattern/Put.java | 228 ++++++
.../nifi/processor/util/pattern/PutGroup.java | 97 +++
.../util/pattern/RollbackOnFailure.java | 226 ++++++
.../processor/util/pattern/RoutingResult.java | 50 ++
.../util/pattern/TestExceptionHandler.java | 202 +++++
.../util/pattern/TestRollbackOnFailure.java | 144 ++++
.../nifi-hive-processors/pom.xml | 4 +
.../hive/AbstractHiveQLProcessor.java | 10 +-
.../apache/nifi/processors/hive/PutHiveQL.java | 160 ++--
.../nifi/processors/hive/PutHiveStreaming.java | 575 ++++++++------
.../nifi/processors/hive/SelectHiveQL.java | 8 +-
.../nifi/processors/hive/TestPutHiveQL.java | 178 ++++-
.../processors/hive/TestPutHiveStreaming.java | 493 +++++++++++-
.../processors/standard/PutDatabaseRecord.java | 747 ++++++++++---------
.../apache/nifi/processors/standard/PutSQL.java | 723 ++++++++++--------
.../standard/TestPutDatabaseRecord.groovy | 142 +++-
.../nifi/processors/standard/TestPutSQL.java | 328 ++++++++
22 files changed, 3856 insertions(+), 1005 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/pom.xml b/nifi-commons/nifi-processor-utilities/pom.xml
index 054f89b..ce5ae0b 100644
--- a/nifi-commons/nifi-processor-utilities/pom.xml
+++ b/nifi-commons/nifi-processor-utilities/pom.xml
@@ -53,5 +53,15 @@
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
new file mode 100644
index 0000000..f97f31d
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+/**
+ * Represents a looping process was discontinued.
+ * When a method throws this exception, its caller should stop processing further inputs and stop immediately.
+ */
+public class DiscontinuedException extends RuntimeException {
+ public DiscontinuedException(String message) {
+ super(message);
+ }
+
+ public DiscontinuedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
new file mode 100644
index 0000000..c6cf140
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Failure;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.ProcessException;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Retry;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Self;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.None;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Penalize;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Yield;
+
+/**
+ * Represents general error types and how it should be treated.
+ */
+public enum ErrorTypes {
+
+ /**
+ * Procedure setting has to be fixed, otherwise the same error would occur irrelevant to the input.
+ * In order to NOT call failing process frequently, this should be yielded.
+ */
+ PersistentFailure(ProcessException, Yield),
+
+ /**
+ * It is unknown whether the error is persistent or temporal, related to the input or not.
+ */
+ UnknownFailure(ProcessException, None),
+
+ /**
+ * The input will be sent to the failure route for recovery without penalizing.
+ * Basically, the input should not be sent to the same procedure again unless the issue has been solved.
+ */
+ InvalidInput(Failure, None),
+
+ /**
+ * The procedure is temporarily unavailable, usually due to the external service unavailability.
+ * Retrying maybe successful, but it should be yielded for a while.
+ */
+ TemporalFailure(Retry, Yield),
+
+ /**
+ * The input was not processed successfully due to some temporal error
+ * related to the specifics of the input. Retrying maybe successful,
+ * but it should be penalized for a while.
+ */
+ TemporalInputFailure(Retry, Penalize),
+
+ /**
+ * The input was not ready for being processed. It will be kept in the incoming queue and also be penalized.
+ */
+ Defer(Self, Penalize);
+
+ private final Destination destination;
+ private final Penalty penalty;
+ ErrorTypes(Destination destination, Penalty penalty){
+ this.destination = destination;
+ this.penalty = penalty;
+ }
+
+ public Result result() {
+ return new Result(destination, penalty);
+ }
+
+ /**
+ * Represents the destination of input.
+ */
+ public enum Destination {
+ ProcessException, Failure, Retry, Self
+ }
+
+ /**
+ * Indicating yield or penalize the processing when transfer the input.
+ */
+ public enum Penalty {
+ Yield, Penalize, None
+ }
+
+ public Destination destination(){
+ return this.destination;
+ }
+
+ public Penalty penalty(){
+ return this.penalty;
+ }
+
+ /**
+ * Result represents a result of a procedure.
+ * ErrorTypes enum contains basic error result patterns.
+ */
+ public static class Result {
+ private final Destination destination;
+ private final Penalty penalty;
+
+ public Result(Destination destination, Penalty penalty) {
+ this.destination = destination;
+ this.penalty = penalty;
+ }
+
+ public Destination destination() {
+ return destination;
+ }
+
+ public Penalty penalty() {
+ return penalty;
+ }
+
+ @Override
+ public String toString() {
+ return "Result{" +
+ "destination=" + destination +
+ ", penalty=" + penalty +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Result result = (Result) o;
+
+ if (destination != result.destination) return false;
+ return penalty == result.penalty;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = destination != null ? destination.hashCode() : 0;
+ result = 31 * result + (penalty != null ? penalty.hashCode() : 0);
+ return result;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java
new file mode 100644
index 0000000..bd1c9eb
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.pattern.ErrorTypes.Result;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * <p>ExceptionHandler provides a structured Exception handling logic composed by reusable partial functions.
+ *
+ * <p>
+ * Benefits of using ExceptionHandler:
+ * <li>Externalized error handling code which provides cleaner program only focusing on the expected path.</li>
+ * <li>Classify specific Exceptions into {@link ErrorTypes}, consolidated error handling based on error type.</li>
+ * <li>Context aware error handling, {@link RollbackOnFailure} for instance.</li>
+ * </p>
+ */
+public class ExceptionHandler<C> {
+
+ @FunctionalInterface
+ public interface Procedure<I> {
+ void apply(I input) throws Exception;
+ }
+
+ public interface OnError<C, I> {
+ void apply(C context, I input, Result result, Exception e);
+
+ default OnError<C, I> andThen(OnError<C, I> after) {
+ return (c, i, r, e) -> {
+ apply(c, i, r, e);
+ after.apply(c, i, r, e);
+ };
+ }
+ }
+
+ /**
+ * Simply categorise an Exception.
+ */
+ private Function<Exception, ErrorTypes> mapException;
+
+ /**
+ * Adjust error type based on the context.
+ */
+ private BiFunction<C, ErrorTypes, Result> adjustError;
+
+ /**
+ * Do some action to the input based on the final error type.
+ */
+ private OnError<C, ?> onError;
+
+ /**
+ * Specify a function that maps an Exception to certain ErrorType.
+ */
+ public void mapException(Function<Exception, ErrorTypes> mapException) {
+ this.mapException = mapException;
+ }
+
+ /**
+ * <p>Specify a function that adjust ErrorType based on a function context.
+ * <p>For example, {@link RollbackOnFailure#createAdjustError(ComponentLog)} decides
+ * whether a process session should rollback or transfer input to failure or retry.
+ */
+ public void adjustError(BiFunction<C, ErrorTypes, Result> adjustError) {
+ this.adjustError = adjustError;
+ }
+
+ /**
+ * <p>Specify a default OnError function that will be called if one is not explicitly specified when {@link #execute(Object, Object, Procedure)} is called.
+ */
+ public void onError(OnError<C, ?> onError) {
+ this.onError = onError;
+ }
+
+ /**
+ * <p>Executes specified procedure function with the input.
+ * <p>Default OnError function will be called when an exception is thrown.
+ * @param context function context
+ * @param input input for procedure
+ * @param procedure a function that does something with the input
+ * @return True if the procedure finished without issue. False if procedure threw an Exception but it was handled by {@link OnError}.
+ * @throws ProcessException Thrown if the exception was not handled by {@link OnError}
+ * @throws DiscontinuedException Indicating the exception was handled by {@link OnError} but process should stop immediately
+ * without processing any further input
+ */
+ @SuppressWarnings("unchecked")
+ public <I> boolean execute(C context, I input, Procedure<I> procedure) throws ProcessException, DiscontinuedException {
+ return execute(context, input, procedure, (OnError<C, I>) onError);
+ }
+
+ /**
+ * <p>Executes specified procedure function with the input.
+ * @param context function context
+ * @param input input for procedure
+ * @param procedure a function that does something with the input
+ * @param onError specify {@link OnError} function for this execution
+ * @return True if the procedure finished without issue. False if procedure threw an Exception but it was handled by {@link OnError}.
+ * @throws ProcessException Thrown if the exception was not handled by {@link OnError}
+ * @throws DiscontinuedException Indicating the exception was handled by {@link OnError} but process should stop immediately
+ * without processing any further input
+ */
+ public <I> boolean execute(C context, I input, Procedure<I> procedure, OnError<C, I> onError) throws ProcessException, DiscontinuedException {
+ try {
+ procedure.apply(input);
+ return true;
+ } catch (Exception e) {
+
+ if (mapException == null) {
+ throw new ProcessException("An exception was thrown: " + e, e);
+ }
+
+ final ErrorTypes type = mapException.apply(e);
+
+ final Result result;
+ if (adjustError != null) {
+ result = adjustError.apply(context, type);
+ } else {
+ result = new Result(type.destination(), type.penalty());
+ }
+
+ if (onError == null) {
+ throw new IllegalStateException("OnError is not set.");
+ }
+
+ onError.apply(context, input, result, e);
+ }
+ return false;
+ }
+
+ private static FlowFile penalize(final ProcessContext context, final ProcessSession session,
+ final FlowFile flowFile, final ErrorTypes.Penalty penalty) {
+ switch (penalty) {
+ case Penalize:
+ return session.penalize(flowFile);
+ case Yield:
+ context.yield();
+ }
+ return flowFile;
+ }
+
+ /**
+ * Create a {@link OnError} function instance that routes input based on {@link Result} destination and penalty.
+ * @param context process context is used to yield a processor
+ * @param session process session is used to penalize a FlowFile
+ * @param routingResult input FlowFile will be routed to a destination relationship in this {@link RoutingResult}
+ * @param relFailure specify failure relationship of a processor
+ * @param relRetry specify retry relationship of a processor
+ * @return composed function
+ */
+ public static <C> ExceptionHandler.OnError<C, FlowFile> createOnError(
+ final ProcessContext context, final ProcessSession session, final RoutingResult routingResult,
+ final Relationship relFailure, final Relationship relRetry) {
+
+ return (fc, input, result, e) -> {
+ final PartialFunctions.FlowFileGroup flowFileGroup = () -> Collections.singletonList(input);
+ createOnGroupError(context, session, routingResult, relFailure, relRetry).apply(fc, flowFileGroup, result, e);
+ };
+ }
+
+ /**
+ * Same as {@link #createOnError(ProcessContext, ProcessSession, RoutingResult, Relationship, Relationship)} for FlowFileGroup.
+ * @param context process context is used to yield a processor
+ * @param session process session is used to penalize FlowFiles
+ * @param routingResult input FlowFiles will be routed to a destination relationship in this {@link RoutingResult}
+ * @param relFailure specify failure relationship of a processor
+ * @param relRetry specify retry relationship of a processor
+ * @return composed function
+ */
+ public static <C, I extends PartialFunctions.FlowFileGroup> ExceptionHandler.OnError<C, I> createOnGroupError(
+ final ProcessContext context, final ProcessSession session, final RoutingResult routingResult,
+ final Relationship relFailure, final Relationship relRetry) {
+ return (c, g, r, e) -> {
+ final Relationship routeTo;
+ switch (r.destination()) {
+ case Failure:
+ routeTo = relFailure;
+ break;
+ case Retry:
+ routeTo = relRetry;
+ break;
+ case Self:
+ routeTo = Relationship.SELF;
+ break;
+ default:
+ if (e instanceof ProcessException) {
+ throw (ProcessException)e;
+ } else {
+ Object inputs = null;
+ if (g != null) {
+ final List<FlowFile> flowFiles = g.getFlowFiles();
+ switch (flowFiles.size()) {
+ case 0:
+ inputs = "[]";
+ break;
+ case 1:
+ inputs = flowFiles.get(0);
+ break;
+ default:
+ inputs = String.format("%d FlowFiles including %s", flowFiles.size(), flowFiles.get(0));
+ break;
+ }
+ }
+ throw new ProcessException(String.format("Failed to process %s due to %s", inputs, e), e);
+ }
+ }
+ for (FlowFile f : g.getFlowFiles()) {
+ final FlowFile maybePenalized = penalize(context, session, f, r.penalty());
+ routingResult.routeTo(maybePenalized, routeTo);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
new file mode 100644
index 0000000..8332289
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+
+/**
+ * This class contains various partial functions those are reusable among process patterns.
+ */
+public class PartialFunctions {
+
+ @FunctionalInterface
+ public interface InitConnection<FC, C> {
+ C apply(ProcessContext context, ProcessSession session, FC functionContext) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface FetchFlowFiles<FC> {
+ List<FlowFile> apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface OnCompleted<FC, C> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface OnFailed<FC, C> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, Exception e) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface Cleanup<FC, C> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface FlowFileGroup {
+ List<FlowFile> getFlowFiles();
+ }
+
+ @FunctionalInterface
+ public interface AdjustRoute<FC> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface TransferFlowFiles<FC> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException;
+
+ default TransferFlowFiles<FC> andThen(TransferFlowFiles<FC> after) {
+ return (context, session, functionContext, result) -> {
+ apply(context, session, functionContext, result);
+ after.apply(context, session, functionContext, result);
+ };
+ }
+ }
+
+ public static <FCT> PartialFunctions.FetchFlowFiles<FCT> fetchSingleFlowFile() {
+ return (context, session, functionContext, result) -> session.get(1);
+ }
+
+ public static <FCT> PartialFunctions.TransferFlowFiles<FCT> transferRoutedFlowFiles() {
+ return (context, session, functionContext, result)
+ -> result.getRoutedFlowFiles().forEach(((relationship, routedFlowFiles)
+ -> session.transfer(routedFlowFiles, relationship)));
+ }
+
+ @FunctionalInterface
+ public interface OnTrigger {
+ void execute(ProcessSession session) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface RollbackSession {
+ void rollback(ProcessSession session, Throwable t);
+ }
+
+ /**
+ * <p>This method is identical to what {@link org.apache.nifi.processor.AbstractProcessor#onTrigger(ProcessContext, ProcessSession)} does.</p>
+ * <p>Create a session from ProcessSessionFactory and execute specified onTrigger function, and commit the session if onTrigger finishes successfully.</p>
+ * <p>When an Exception is thrown during execution of the onTrigger, the session will be rollback. FlowFiles being processed will be penalized.</p>
+ */
+ public static void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory, ComponentLog logger, OnTrigger onTrigger) throws ProcessException {
+ onTrigger(context, sessionFactory, logger, onTrigger, (session, t) -> session.rollback(true));
+ }
+
+ public static void onTrigger(
+ ProcessContext context, ProcessSessionFactory sessionFactory, ComponentLog logger, OnTrigger onTrigger,
+ RollbackSession rollbackSession) throws ProcessException {
+ final ProcessSession session = sessionFactory.createSession();
+ try {
+ onTrigger.execute(session);
+ session.commit();
+ } catch (final Throwable t) {
+ logger.error("{} failed to process due to {}; rolling back session", new Object[]{onTrigger, t});
+ rollbackSession.rollback(session, t);
+ throw t;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
new file mode 100644
index 0000000..790f48a
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract Put pattern class with a generic onTrigger method structure, composed with various partial functions.
+ * @param <FC> Class of context instance which is passed to each partial functions.
+ * Lifetime of an function context should be limited for a single onTrigger method.
+ * @param <C> Class of connection to a data storage that this pattern puts data into.
+ */
+public class Put<FC, C extends AutoCloseable> {
+ protected PartialFunctions.InitConnection<FC, C> initConnection;
+ protected PartialFunctions.FetchFlowFiles<FC> fetchFlowFiles = PartialFunctions.fetchSingleFlowFile();
+ protected PutFlowFile<FC, C> putFlowFile;
+ protected PartialFunctions.TransferFlowFiles<FC> transferFlowFiles = PartialFunctions.transferRoutedFlowFiles();
+ protected PartialFunctions.AdjustRoute<FC> adjustRoute;
+ protected PartialFunctions.OnCompleted<FC, C> onCompleted;
+ protected PartialFunctions.OnFailed<FC, C> onFailed;
+ protected PartialFunctions.Cleanup<FC, C> cleanup;
+ protected ComponentLog logger;
+
+ /**
+ * Put fetched FlowFiles to a data storage.
+ * @param context process context passed from a Processor onTrigger.
+ * @param session process session passed from a Processor onTrigger.
+ * @param functionContext function context passed from a Processor onTrigger.
+ * @param connection connection to data storage, established by {@link PartialFunctions.InitConnection}.
+ * @param flowFiles FlowFiles fetched from {@link PartialFunctions.FetchFlowFiles}.
+ * @param result Route incoming FlowFiles if necessary.
+ */
+ protected void putFlowFiles(ProcessContext context, ProcessSession session,
+ FC functionContext, C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException {
+ for (FlowFile flowFile : flowFiles) {
+ putFlowFile.apply(context, session, functionContext, connection, flowFile, result);
+ }
+ }
+
+ protected void validateCompositePattern() {
+ Objects.requireNonNull(initConnection, "InitConnection function is required.");
+ Objects.requireNonNull(putFlowFile, "PutFlowFile function is required.");
+ Objects.requireNonNull(transferFlowFiles, "TransferFlowFiles function is required.");
+ }
+
+ /**
+ * <p>Processor using this pattern is expected to call this method from its onTrigger.
+ * <p>Typical usage would be constructing a process pattern instance at a processor method
+ * which is annotated with {@link org.apache.nifi.annotation.lifecycle.OnScheduled},
+ * and use pattern.onTrigger from processor.onTrigger.
+ * <p>{@link PartialFunctions.InitConnection} is required at least. In addition to any functions required by an implementation class.
+ * @param context process context passed from a Processor onTrigger.
+ * @param session process session passed from a Processor onTrigger.
+ * @param functionContext function context should be instantiated per onTrigger call.
+ * @throws ProcessException Each partial function can throw ProcessException if onTrigger should stop immediately.
+ */
+ public void onTrigger(ProcessContext context, ProcessSession session, FC functionContext) throws ProcessException {
+
+ validateCompositePattern();
+
+ final RoutingResult result = new RoutingResult();
+ final List<FlowFile> flowFiles = fetchFlowFiles.apply(context, session, functionContext, result);
+
+ // Transfer FlowFiles if there is any.
+ result.getRoutedFlowFiles().forEach(((relationship, routedFlowFiles) ->
+ session.transfer(routedFlowFiles, relationship)));
+
+ if (flowFiles == null || flowFiles.isEmpty()) {
+ logger.debug("No incoming FlowFiles.");
+ return;
+ }
+
+ try (C connection = initConnection.apply(context, session, functionContext)) {
+
+ try {
+ // Execute the core function.
+ try {
+ putFlowFiles(context, session, functionContext, connection, flowFiles, result);
+ } catch (DiscontinuedException e) {
+ // Whether it was an error or semi normal is depends on the implementation and reason why it wanted to discontinue.
+ // So, no logging is needed here.
+ }
+
+ // Extension point to alter routes.
+ if (adjustRoute != null) {
+ adjustRoute.apply(context, session, functionContext, result);
+ }
+
+ // Put fetched, but unprocessed FlowFiles back to self.
+ final List<FlowFile> transferredFlowFiles = result.getRoutedFlowFiles().values().stream()
+ .flatMap(List::stream).collect(Collectors.toList());
+ final List<FlowFile> unprocessedFlowFiles = flowFiles.stream()
+ .filter(flowFile -> !transferredFlowFiles.contains(flowFile)).collect(Collectors.toList());
+ result.routeTo(unprocessedFlowFiles, Relationship.SELF);
+
+ // OnCompleted processing.
+ if (onCompleted != null) {
+ onCompleted.apply(context, session, functionContext, connection);
+ }
+
+ // Transfer FlowFiles.
+ transferFlowFiles.apply(context, session, functionContext, result);
+
+ } catch (Exception e) {
+ if (onFailed != null) {
+ onFailed.apply(context, session, functionContext, connection, e);
+ }
+ throw e;
+ } finally {
+ if (cleanup != null) {
+ cleanup.apply(context, session, functionContext, connection);
+ }
+ }
+
+ } catch (ProcessException e) {
+ throw e;
+ } catch (Exception e) {
+ // Throw uncaught exception as RuntimeException so that this processor will be yielded.
+ final String msg = String.format("Failed to execute due to %s", e);
+ logger.error(msg, e);
+ throw new RuntimeException(msg, e);
+ }
+
+ }
+
+ /**
+ * Specify an optional function that fetches incoming FlowFIles.
+ * If not specified, single FlowFile is fetched on each onTrigger.
+ * @param f Function to fetch incoming FlowFiles.
+ */
+ public void fetchFlowFiles(PartialFunctions.FetchFlowFiles<FC> f) {
+ fetchFlowFiles = f;
+ }
+
+ /**
+ * Specify a function that establishes a connection to target data storage.
+ * This function will be called when there is valid incoming FlowFiles.
+ * The created connection instance is automatically closed when onTrigger is finished.
+ * @param f Function to initiate a connection to a data storage.
+ */
+ public void initConnection(PartialFunctions.InitConnection<FC, C> f) {
+ initConnection = f;
+ }
+
+ /**
+ * Specify a function that puts an incoming FlowFile to target data storage.
+ * @param f a function to put a FlowFile to target storage.
+ */
+ public void putFlowFile(PutFlowFile<FC, C> f) {
+ this.putFlowFile = f;
+ }
+
+ /**
+ * Specify an optional function that adjust routed FlowFiles before transfer it.
+ * @param f a function to adjust route.
+ */
+ public void adjustRoute(PartialFunctions.AdjustRoute<FC> f) {
+ this.adjustRoute = f;
+ }
+
+ /**
+ * Specify an optional function responsible for transferring routed FlowFiles.
+ * If not specified routed FlowFiles are simply transferred to its destination by default.
+ * @param f a function to transfer routed FlowFiles.
+ */
+ public void transferFlowFiles(PartialFunctions.TransferFlowFiles<FC> f) {
+ this.transferFlowFiles = f;
+ }
+
+ /**
+ * Specify an optional function which will be called if input FlowFiles were successfully put to a target storage.
+ * @param f Function to be called when a put operation finishes successfully.
+ */
+ public void onCompleted(PartialFunctions.OnCompleted<FC, C> f) {
+ onCompleted = f;
+ }
+
+ /**
+ * Specify an optional function which will be called if input FlowFiles failed being put to a target storage.
+ * @param f Function to be called when a put operation failed.
+ */
+ public void onFailed(PartialFunctions.OnFailed<FC, C> f) {
+ onFailed = f;
+ }
+
+ /**
+ * Specify an optional function which will be called in a finally block.
+ * Typically useful when a special cleanup operation is needed for the connection.
+ * @param f Function to be called when a put operation finished regardless of whether it succeeded or not.
+ */
+ public void cleanup(PartialFunctions.Cleanup<FC, C> f) {
+ cleanup = f;
+ }
+
+ public void setLogger(ComponentLog logger) {
+ this.logger = logger;
+ }
+
+ @FunctionalInterface
+ public interface PutFlowFile<FC, C> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection,
+ FlowFile flowFile, RoutingResult result) throws ProcessException;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java
new file mode 100644
index 0000000..6e9da2e
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Extended Put pattern capable of handling FlowFile groups.
+ * @param <FC> Function context class.
+ * @param <C> Connection class.
+ * @param <FFG> FlowFileGroup class.
+ */
+public class PutGroup<FC, C extends AutoCloseable, FFG extends PartialFunctions.FlowFileGroup> extends Put<FC, C> {
+
+
+ public PutGroup() {
+ // Just to make a composition valid.
+ this.putFlowFile = (context, session, functionContext, connection, inputFlowFile, result) -> {
+ throw new UnsupportedOperationException();
+ };
+ }
+
+ @FunctionalInterface
+ public interface PutFlowFiles<FC, C, FFG> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection,
+ FFG inputFlowFileGroup, RoutingResult result) throws ProcessException;
+ }
+
+ @Override
+ protected void validateCompositePattern() {
+ super.validateCompositePattern();
+ Objects.requireNonNull(groupFlowFiles, "GroupFlowFiles function is required.");
+ }
+
+ /**
+ * PutGroup does not support PutFileFile function for single FlowFile.
+ * Throws UnsupportedOperationException if called.
+ */
+ @Override
+ public void putFlowFile(PutFlowFile<FC, C> putFlowFile) {
+ throw new UnsupportedOperationException("PutFlowFile can not be used with PutGroup pattern. Specify PutFlowFiles instead.");
+ }
+
+ @FunctionalInterface
+ public interface GroupFlowFiles<FC, C, FFG> {
+ List<FFG> apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException;
+ }
+
+ private GroupFlowFiles<FC, C, FFG> groupFlowFiles;
+ private PutFlowFiles<FC, C, FFG> putFlowFiles;
+
+ /**
+ * Specify a function that groups input FlowFiles into FlowFile groups.
+ */
+ public void groupFetchedFlowFiles(GroupFlowFiles<FC, C, FFG> f) {
+ groupFlowFiles = f;
+ }
+
+ /**
+ * Specify a function that puts an input FlowFile group to a target storage using a given connection.
+ */
+ public void putFlowFiles(PutFlowFiles<FC, C, FFG> f) {
+ putFlowFiles = f;
+ }
+
+
+ @Override
+ protected void putFlowFiles(ProcessContext context, ProcessSession session, FC functionContext,
+ C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException {
+ final List<FFG> flowFileGroups = groupFlowFiles
+ .apply(context, session, functionContext, connection, flowFiles, result);
+
+ for (FFG group : flowFileGroups) {
+ putFlowFiles.apply(context, session, functionContext, connection, group, result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java
new file mode 100644
index 0000000..2d4d768
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.AdjustRoute;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * <p>RollbackOnFailure can be used as a function context for process patterns such as {@link Put} to provide a configurable error handling.
+ *
+ * <p>
+ * RollbackOnFailure can add following characteristics to a processor:
+ * <li>When disabled, input FlowFiles caused an error will be routed to 'failure' or 'retry' relationship, based on the type of error.</li>
+ * <li>When enabled, input FlowFiles are kept in the input queue. A ProcessException is thrown to rollback the process session.</li>
+ * <li>It assumes anything happened during a processors onTrigger can rollback, if this is marked as transactional.</li>
+ * <li>If transactional and enabled, even if some FlowFiles are already processed, it rollbacks the session when error occurs.</li>
+ * <li>If not transactional and enabled, it only rollbacks the session when error occurs only if there was no progress.</li>
+ * </p>
+ *
+ * <p>There are two approaches to apply RollbackOnFailure. One is using {@link ExceptionHandler#adjustError(BiFunction)},
+ * and the other is implementing processor onTrigger using process patterns such as {@link Put#adjustRoute(AdjustRoute)}. </p>
+ *
+ * <p>It's also possible to use both approaches. ExceptionHandler can apply when an Exception is thrown immediately, while AdjustRoute respond later but requires less code.</p>
+ */
+public class RollbackOnFailure {
+
+ private final boolean rollbackOnFailure;
+ private final boolean transactional;
+ private boolean discontinue;
+
+ private int processedCount = 0;
+
+ /**
+ * Constructor.
+ * @param rollbackOnFailure Should be set by user via processor configuration.
+ * @param transactional Specify whether a processor is transactional.
+ * If not, it is important to call {@link #proceed()} after successful execution of processors task,
+ * that indicates processor made an operation that can not be undone.
+ */
+ public RollbackOnFailure(boolean rollbackOnFailure, boolean transactional) {
+ this.rollbackOnFailure = rollbackOnFailure;
+ this.transactional = transactional;
+ }
+
+ public static final PropertyDescriptor ROLLBACK_ON_FAILURE = createRollbackOnFailureProperty("");
+
+ public static PropertyDescriptor createRollbackOnFailureProperty(String additionalDescription) {
+ return new PropertyDescriptor.Builder()
+ .name("rollback-on-failure")
+ .displayName("Rollback On Failure")
+ .description("Specify how to handle error." +
+ " By default (false), if an error occurs while processing a FlowFile, the FlowFile will be routed to" +
+ " 'failure' or 'retry' relationship based on error type, and processor can continue with next FlowFile." +
+ " Instead, you may want to rollback currently processed FlowFiles and stop further processing immediately." +
+ " In that case, you can do so by enabling this 'Rollback On Failure' property. " +
+ " If enabled, failed FlowFiles will stay in the input relationship without penalizing it and being processed repeatedly" +
+ " until it gets processed successfully or removed by other means." +
+ " It is important to set adequate 'Yield Duration' to avoid retrying too frequently." + additionalDescription)
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("false")
+ .required(true)
+ .build();
+ }
+
+ /**
+ * Create a function to use with {@link ExceptionHandler} that adjust error type based on functional context.
+ */
+ public static <FCT extends RollbackOnFailure> BiFunction<FCT, ErrorTypes, ErrorTypes.Result> createAdjustError(final ComponentLog logger) {
+ return (c, t) -> {
+
+ ErrorTypes.Result adjusted = null;
+ switch (t.destination()) {
+
+ case ProcessException:
+ // If this process can rollback, then rollback it.
+ if (!c.canRollback()) {
+ // If an exception is thrown but the processor is not transactional and processed count > 0, adjust it to self,
+ // in order to stop any further processing until this input is processed successfully.
+ // If we throw an Exception in this state, the already succeeded FlowFiles will be rolled back, too.
+ // In case the progress was made by other preceding inputs,
+ // those successful inputs should be sent to 'success' and this input stays in incoming queue.
+ // In case this input made some progress to external system, the partial update will be replayed again,
+ // can cause duplicated data.
+ c.discontinue();
+ // We should not penalize a FlowFile, if we did, other FlowFiles can be fetched first.
+ // We need to block others to be processed until this one finishes.
+ adjusted = new ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield);
+ }
+ break;
+
+ case Failure:
+ case Retry:
+ if (c.isRollbackOnFailure()) {
+ c.discontinue();
+ if (c.canRollback()) {
+ // If this process can rollback, then throw ProcessException instead, in order to rollback.
+ adjusted = new ErrorTypes.Result(ErrorTypes.Destination.ProcessException, ErrorTypes.Penalty.Yield);
+ } else {
+ // If not,
+ adjusted = new ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield);
+ }
+ }
+ break;
+ }
+
+ if (adjusted != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adjusted {} to {} based on context rollbackOnFailure={}, processedCount={}, transactional={}",
+ new Object[]{t, adjusted, c.isRollbackOnFailure(), c.getProcessedCount(), c.isTransactional()});
+ }
+ return adjusted;
+ }
+
+ return t.result();
+ };
+ }
+
+ /**
+ * Create an {@link AdjustRoute} function to use with process pattern such as {@link Put} that adjust routed FlowFiles based on context.
+ * This function works as a safety net by covering cases that Processor implementation did not use ExceptionHandler and transfer FlowFiles
+ * without considering RollbackOnFailure context.
+ */
+ public static <FCT extends RollbackOnFailure> AdjustRoute<FCT> createAdjustRoute(Relationship ... failureRelationships) {
+ return (context, session, fc, result) -> {
+ if (fc.isRollbackOnFailure()) {
+ // Check if route contains failure relationship.
+ for (Relationship failureRelationship : failureRelationships) {
+ if (!result.contains(failureRelationship)) {
+ continue;
+ }
+ if (fc.canRollback()) {
+ throw new ProcessException(String.format(
+ "A FlowFile is routed to %s. Rollback session based on context rollbackOnFailure=%s, processedCount=%d, transactional=%s",
+ failureRelationship.getName(), fc.isRollbackOnFailure(), fc.getProcessedCount(), fc.isTransactional()));
+ } else {
+ // Send failed FlowFiles to self.
+ final Map<Relationship, List<FlowFile>> routedFlowFiles = result.getRoutedFlowFiles();
+ final List<FlowFile> failedFlowFiles = routedFlowFiles.remove(failureRelationship);
+ result.routeTo(failedFlowFiles, Relationship.SELF);
+ }
+ }
+ }
+ };
+ }
+
+ public static <FCT extends RollbackOnFailure, I> ExceptionHandler.OnError<FCT, I> createOnError(ExceptionHandler.OnError<FCT, I> onError) {
+ return onError.andThen((context, input, result, e) -> {
+ if (context.shouldDiscontinue()) {
+ throw new DiscontinuedException("Discontinue processing due to " + e, e);
+ }
+ });
+ }
+
+ public static <FCT extends RollbackOnFailure> void onTrigger(
+ ProcessContext context, ProcessSessionFactory sessionFactory, FCT functionContext, ComponentLog logger,
+ PartialFunctions.OnTrigger onTrigger) throws ProcessException {
+
+ PartialFunctions.onTrigger(context, sessionFactory, logger, onTrigger, (session, t) -> {
+ // If RollbackOnFailure is enabled, do not penalize processing FlowFiles when rollback,
+ // in order to keep those in the incoming relationship to be processed again.
+ final boolean shouldPenalize = !functionContext.isRollbackOnFailure();
+ session.rollback(shouldPenalize);
+
+ // However, keeping failed FlowFile in the incoming relationship would retry it too often.
+ // So, administratively yield the process.
+ if (functionContext.isRollbackOnFailure()) {
+ logger.warn("Administratively yielding {} after rolling back due to {}", new Object[]{context.getName(), t}, t);
+ context.yield();
+ }
+ });
+ }
+
+ public int proceed() {
+ return ++processedCount;
+ }
+
+ public int getProcessedCount() {
+ return processedCount;
+ }
+
+ public boolean isRollbackOnFailure() {
+ return rollbackOnFailure;
+ }
+
+ public boolean isTransactional() {
+ return transactional;
+ }
+
+ public boolean canRollback() {
+ return transactional || processedCount == 0;
+ }
+
+ public boolean shouldDiscontinue() {
+ return discontinue;
+ }
+
+ public void discontinue() {
+ this.discontinue = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java
new file mode 100644
index 0000000..200d893
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RoutingResult {
+
+ private final Map<Relationship, List<FlowFile>> routedFlowFiles = new HashMap<>();
+
+ public void routeTo(final FlowFile flowFile, final Relationship relationship) {
+ routedFlowFiles.computeIfAbsent(relationship, r -> new ArrayList<>()).add(flowFile);
+ }
+
+ public void routeTo(final List<FlowFile> flowFiles, final Relationship relationship) {
+ routedFlowFiles.computeIfAbsent(relationship, r -> new ArrayList<>()).addAll(flowFiles);
+ }
+
+ public void merge(final RoutingResult r) {
+ r.getRoutedFlowFiles().forEach((relationship, routedFlowFiles) -> routeTo(routedFlowFiles, relationship));
+ }
+
+ public Map<Relationship, List<FlowFile>> getRoutedFlowFiles() {
+ return routedFlowFiles;
+ }
+
+ public boolean contains(Relationship relationship) {
+ return routedFlowFiles.containsKey(relationship) && !routedFlowFiles.get(relationship).isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java
new file mode 100644
index 0000000..bd73379
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.processor.exception.ProcessException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestExceptionHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestExceptionHandler.class);
+
+ /**
+ * Simulate an external procedure.
+ */
+ static class ExternalProcedure {
+ private boolean available = true;
+ int divide(Integer a, Integer b) throws Exception {
+ if (!available) {
+ throw new IOException("Not available");
+ }
+ if (a == 10) {
+ throw new IllegalStateException("Service for 10 is not currently available.");
+ }
+ return a / b;
+ }
+ }
+
+ private class Context {
+ int count = 0;
+ }
+
+ @Test
+ public void testBasicUsage() {
+
+ final ExternalProcedure p = new ExternalProcedure();
+
+ try {
+ // Although a catch-exception has to be caught each possible call,
+ // usually the error handling logic will be the same.
+ // Ends up having a lot of same code.
+ final int r1 = p.divide(4, 2);
+ assertEquals(2, r1);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ final Context context = new Context();
+ final ExceptionHandler<Context> handler = new ExceptionHandler<>();
+
+ // Using handler can avoid the try catch block with reusable error handling logic.
+ handler.execute(context, 6, i -> {
+ final int r2 = p.divide(i, 2);
+ assertEquals(3, r2);
+ });
+
+ // If return value is needed, use AtomicReference.
+ AtomicReference<Integer> r = new AtomicReference<>();
+ handler.execute(context, 8, i -> r.set(p.divide(i, 2)));
+ assertEquals(4, r.get().intValue());
+
+ // If no exception mapping is specified, any Exception thrown is wrapped by ProcessException.
+ try {
+ final Integer nullInput = null;
+ handler.execute(context, nullInput, i -> r.set(p.divide(i, 2)));
+ fail("Exception should be thrown because input is null.");
+ } catch (ProcessException e) {
+ assertTrue(e.getCause() instanceof NullPointerException);
+ }
+ }
+
+ // Reusable Exception mapping function.
+ static Function<Exception, ErrorTypes> exceptionMapping = i -> {
+ try {
+ throw i;
+ } catch (NullPointerException | ArithmeticException | NumberFormatException e) {
+ return ErrorTypes.InvalidInput;
+ } catch (IllegalStateException e) {
+ return ErrorTypes.TemporalInputFailure;
+ } catch (IOException e) {
+ return ErrorTypes.TemporalFailure;
+ } catch (Exception e) {
+ throw new ProcessException(e);
+ }
+ };
+
+ @Test
+ public void testHandling() {
+
+ final ExternalProcedure p = new ExternalProcedure();
+ final Context context = new Context();
+
+ final ExceptionHandler<Context> handler = new ExceptionHandler<>();
+ handler.mapException(exceptionMapping);
+ handler.onError(createInputErrorHandler());
+
+ // Benefit of handler is being able to externalize error handling, make it simpler.
+ handler.execute(context, 4, i -> {
+ final int r = p.divide(i, 2);
+ assertEquals(2, r);
+ });
+
+ // Null pointer exception.
+ final Integer input = null;
+ handler.execute(context, input, i -> {
+ p.divide(i, 2);
+ fail("Shouldn't reach here.");
+ });
+
+ // Divide by zero.
+ handler.execute(context, 0, i -> {
+ p.divide(2, i);
+ fail("Shouldn't reach here.");
+ });
+
+
+ }
+
+ static <C> ExceptionHandler.OnError<C, Integer> createInputErrorHandler() {
+ return (c, i, r, e) -> {
+ switch (r.destination()) {
+ case ProcessException:
+ throw new ProcessException(String.format("Execution failed due to %s", e), e);
+ default:
+ logger.warn(String.format("Routing to %s: %d caused %s", r, i, e));
+ }
+ };
+ }
+
+ static <C> ExceptionHandler.OnError<C, Integer[]> createArrayInputErrorHandler() {
+ return (c, i, r, e) -> {
+ switch (r.destination()) {
+ case ProcessException:
+ throw new ProcessException(String.format("Execution failed due to %s", e), e);
+ default:
+ logger.warn(String.format("Routing to %s: %d, %d caused %s", r, i[0], i[1], e));
+ }
+ };
+ }
+
+ @Test
+ public void testHandlingLoop() {
+
+ final ExternalProcedure p = new ExternalProcedure();
+ final Context context = new Context();
+
+ final ExceptionHandler<Context> handler = new ExceptionHandler<>();
+ handler.mapException(exceptionMapping);
+ handler.onError(createArrayInputErrorHandler());
+
+ // It's especially handy when looping through inputs. [a, b, expected result]
+ Integer[][] inputs = new Integer[][]{{4, 2, 2}, {null, 2, 999}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}};
+
+ Arrays.stream(inputs).forEach(input -> handler.execute(context, input, (in) -> {
+ final Integer r = p.divide(in[0], in[1]);
+ // This is safe because if p.divide throws error, this code won't be executed.
+ assertEquals(in[2], r);
+ }));
+
+ AtomicReference<Integer> r = new AtomicReference<>();
+ for (Integer[] input : inputs) {
+
+ if (!handler.execute(context, input, (in) -> {
+ r.set(p.divide(in[0], in[1]));
+ context.count++;
+ })){
+ // Handler returns false when it fails.
+ // Cleaner if-exception-continue-next-input can be written cleaner.
+ continue;
+ }
+
+ assertEquals(input[2], r.get());
+ }
+
+ assertEquals("Successful inputs", 2, context.count);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java
new file mode 100644
index 0000000..6d73759
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.pattern.TestExceptionHandler.ExternalProcedure;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.nifi.processor.util.pattern.TestExceptionHandler.createArrayInputErrorHandler;
+import static org.apache.nifi.processor.util.pattern.TestExceptionHandler.exceptionMapping;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestRollbackOnFailure {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestRollbackOnFailure.class);
+
+ /**
+ * This can be an example for how to compose an ExceptionHandler instance by reusable functions.
+ * @param logger used to log messages within functions
+ * @return a composed ExceptionHandler
+ */
+ private ExceptionHandler<RollbackOnFailure> getContextAwareExceptionHandler(ComponentLog logger) {
+ final ExceptionHandler<RollbackOnFailure> handler = new ExceptionHandler<>();
+ handler.mapException(exceptionMapping);
+ handler.adjustError(RollbackOnFailure.createAdjustError(logger));
+ handler.onError(createArrayInputErrorHandler());
+ return handler;
+ }
+
+ private void processInputs(RollbackOnFailure context, Integer[][] inputs, List<Integer> results) {
+ final ExternalProcedure p = new ExternalProcedure();
+ final MockComponentLog componentLog = new MockComponentLog("processor-id", this);
+ final ExceptionHandler<RollbackOnFailure> handler = getContextAwareExceptionHandler(componentLog);
+
+ for (Integer[] input : inputs) {
+
+ if (!handler.execute(context, input, (in) -> {
+ results.add(p.divide(in[0], in[1]));
+ context.proceed();
+ })){
+ continue;
+ }
+
+ assertEquals(input[2], results.get(results.size() - 1));
+ }
+ }
+
+ @Test
+ public void testContextDefaultBehavior() {
+
+ // Disabling rollbackOnFailure would route Failure or Retry as they are.
+ final RollbackOnFailure context = new RollbackOnFailure(false, false);
+
+ Integer[][] inputs = new Integer[][]{{null, 2, 999}, {4, 2, 2}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}};
+
+ final List<Integer> results = new ArrayList<>();
+ try {
+ processInputs(context, inputs, results);
+ } catch (ProcessException e) {
+ fail("ProcessException should NOT be thrown");
+ }
+
+ assertEquals("Successful inputs", 2, context.getProcessedCount());
+ }
+
+ @Test
+ public void testContextRollbackOnFailureNonTransactionalFirstFailure() {
+
+ final RollbackOnFailure context = new RollbackOnFailure(true, false);
+
+ // If the first execution fails without any succeeded inputs, it should throw a ProcessException.
+ Integer[][] inputs = new Integer[][]{{null, 2, 999}, {4, 2, 2}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}};
+
+ final List<Integer> results = new ArrayList<>();
+ try {
+ processInputs(context, inputs, results);
+ fail("ProcessException should be thrown");
+ } catch (ProcessException e) {
+ logger.info("Exception was thrown as expected.");
+ }
+
+ assertEquals("Successful inputs", 0, context.getProcessedCount());
+ }
+
+ @Test
+ public void testContextRollbackOnFailureNonTransactionalAlreadySucceeded() {
+
+ final RollbackOnFailure context = new RollbackOnFailure(true, false);
+
+ // If an execution fails after succeeded inputs, it transfer the input to Failure instead of ProcessException,
+ // and keep going. Because the external system does not support transaction.
+ Integer[][] inputs = new Integer[][]{{4, 2, 2}, {2, 0, 999}, {null, 2, 999}, {10, 2, 999}, {8, 2, 4}};
+
+ final List<Integer> results = new ArrayList<>();
+ try {
+ processInputs(context, inputs, results);
+ } catch (ProcessException e) {
+ fail("ProcessException should NOT be thrown");
+ }
+
+ assertEquals("Successful inputs", 2, context.getProcessedCount());
+ }
+
+ @Test
+ public void testContextRollbackOnFailureTransactionalAlreadySucceeded() {
+
+ final RollbackOnFailure context = new RollbackOnFailure(true, true);
+
+ // Even if an execution fails after succeeded inputs, it transfer the input to Failure,
+ // because the external system supports transaction.
+ Integer[][] inputs = new Integer[][]{{4, 2, 2}, {2, 0, 999}, {null, 2, 999}, {10, 2, 999}, {8, 2, 4}};
+
+ final List<Integer> results = new ArrayList<>();
+ try {
+ processInputs(context, inputs, results);
+ fail("ProcessException should be thrown");
+ } catch (ProcessException e) {
+ logger.info("Exception was thrown as expected.");
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
index ea6e5df..661180e 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
@@ -32,6 +32,10 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-processor-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<scope>provided</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java
index 3835ff7..1a2110a 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java
@@ -19,9 +19,8 @@ package org.apache.nifi.processors.hive;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.hive.HiveDBCPService;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
@@ -30,6 +29,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.charset.Charset;
+import java.sql.SQLDataException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Date;
@@ -45,7 +45,7 @@ import java.util.regex.Pattern;
/**
* An abstract base class for HiveQL processors to share common data, methods, etc.
*/
-public abstract class AbstractHiveQLProcessor extends AbstractProcessor {
+public abstract class AbstractHiveQLProcessor extends AbstractSessionFactoryProcessor {
protected static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("hiveql\\.args\\.(\\d+)\\.type");
protected static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
@@ -112,7 +112,7 @@ public abstract class AbstractHiveQLProcessor extends AbstractProcessor {
if (parameterIndex >= base && parameterIndex < base + paramCount) {
final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
if (!isNumeric) {
- throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral jdbcType");
+ throw new SQLDataException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral jdbcType");
}
final String valueAttrName = "hiveql.args." + parameterIndex + ".value";
@@ -139,7 +139,7 @@ public abstract class AbstractHiveQLProcessor extends AbstractProcessor {
try {
setParameter(stmt, ph.attributeName, index, ph.value, ph.jdbcType);
} catch (final NumberFormatException nfe) {
- throw new ProcessException("The value of the " + ph.attributeName + " is '" + ph.value + "', which cannot be converted into the necessary data jdbcType", nfe);
+ throw new SQLDataException("The value of the " + ph.attributeName + " is '" + ph.value + "', which cannot be converted into the necessary data jdbcType", nfe);
}
}
return base + paramCount;