You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/27 17:56:59 UTC
[5/5] nifi git commit: NIFI-3415: Add Rollback on Failure.
NIFI-3415: Add Rollback on Failure.
- Added org.apache.nifi.processor.util.pattern package in nifi-processor-utils containing reusable functions to mix-in 'Rollback on Failure' capability.
- Created a process pattern classes, Put and PutGroup. It will be helpful to standardize Processor implementations.
- Applied Rollback on Failure to PutSQL, PutHiveQL, PutHiveStreaming and PutDatabaseRecord.
- Stop using AbstractProcessor for these processors, as it penalizes FlowFiles being processed when it rollback a process session. If FlowFiles are penalized, it will not be fetched again until penalization expires.
- Yield processor when a failure occurs and RollbackOnFailure is enabled. If we do not penalize nor yield, a failed FlowFile retries too frequently.
- When Rollback on Failure is enabled but processor is not transactional, discontinue when an error occurred after successful processes.
- Fixed existing issues on PutHiveStreaming:
- Output FlowFile Avro format was corrupted by concatenating multiple Avro files.
- Output FlowFile records had incorrect values because of reusing GenericRecord instance.
Signed-off-by: Matt Burgess <ma...@apache.org>
This closes #1658
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d9acdb54
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d9acdb54
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d9acdb54
Branch: refs/heads/master
Commit: d9acdb54bec96695837f8fcde54c58403aa46f29
Parents: a1bffbc
Author: Koji Kawamura <ij...@apache.org>
Authored: Thu Mar 2 09:51:12 2017 +0900
Committer: Matt Burgess <ma...@apache.org>
Committed: Thu Apr 27 13:44:56 2017 -0400
----------------------------------------------------------------------
nifi-commons/nifi-processor-utilities/pom.xml | 10 +
.../util/pattern/DiscontinuedException.java | 31 +
.../nifi/processor/util/pattern/ErrorTypes.java | 148 ++++
.../util/pattern/ExceptionHandler.java | 235 ++++++
.../util/pattern/PartialFunctions.java | 122 +++
.../apache/nifi/processor/util/pattern/Put.java | 228 ++++++
.../nifi/processor/util/pattern/PutGroup.java | 97 +++
.../util/pattern/RollbackOnFailure.java | 226 ++++++
.../processor/util/pattern/RoutingResult.java | 50 ++
.../util/pattern/TestExceptionHandler.java | 202 +++++
.../util/pattern/TestRollbackOnFailure.java | 144 ++++
.../nifi-hive-processors/pom.xml | 4 +
.../hive/AbstractHiveQLProcessor.java | 10 +-
.../apache/nifi/processors/hive/PutHiveQL.java | 160 ++--
.../nifi/processors/hive/PutHiveStreaming.java | 575 ++++++++------
.../nifi/processors/hive/SelectHiveQL.java | 8 +-
.../nifi/processors/hive/TestPutHiveQL.java | 178 ++++-
.../processors/hive/TestPutHiveStreaming.java | 493 +++++++++++-
.../processors/standard/PutDatabaseRecord.java | 747 ++++++++++---------
.../apache/nifi/processors/standard/PutSQL.java | 723 ++++++++++--------
.../standard/TestPutDatabaseRecord.groovy | 142 +++-
.../nifi/processors/standard/TestPutSQL.java | 328 ++++++++
22 files changed, 3856 insertions(+), 1005 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/pom.xml b/nifi-commons/nifi-processor-utilities/pom.xml
index 054f89b..ce5ae0b 100644
--- a/nifi-commons/nifi-processor-utilities/pom.xml
+++ b/nifi-commons/nifi-processor-utilities/pom.xml
@@ -53,5 +53,15 @@
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
new file mode 100644
index 0000000..f97f31d
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+/**
+ * Represents a looping process was discontinued.
+ * When a method throws this exception, its caller should stop processing further inputs and stop immediately.
+ */
+public class DiscontinuedException extends RuntimeException {
+ public DiscontinuedException(String message) {
+ super(message);
+ }
+
+ public DiscontinuedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
new file mode 100644
index 0000000..c6cf140
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Failure;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.ProcessException;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Retry;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Self;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.None;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Penalize;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Yield;
+
+/**
+ * Represents general error types and how it should be treated.
+ */
+public enum ErrorTypes {
+
+ /**
+ * Procedure setting has to be fixed, otherwise the same error would occur irrelevant to the input.
+ * In order to NOT call failing process frequently, this should be yielded.
+ */
+ PersistentFailure(ProcessException, Yield),
+
+ /**
+ * It is unknown whether the error is persistent or temporal, related to the input or not.
+ */
+ UnknownFailure(ProcessException, None),
+
+ /**
+ * The input will be sent to the failure route for recovery without penalizing.
+ * Basically, the input should not be sent to the same procedure again unless the issue has been solved.
+ */
+ InvalidInput(Failure, None),
+
+ /**
+ * The procedure is temporarily unavailable, usually due to the external service unavailability.
+ * Retrying maybe successful, but it should be yielded for a while.
+ */
+ TemporalFailure(Retry, Yield),
+
+ /**
+ * The input was not processed successfully due to some temporal error
+ * related to the specifics of the input. Retrying maybe successful,
+ * but it should be penalized for a while.
+ */
+ TemporalInputFailure(Retry, Penalize),
+
+ /**
+ * The input was not ready for being processed. It will be kept in the incoming queue and also be penalized.
+ */
+ Defer(Self, Penalize);
+
+ private final Destination destination;
+ private final Penalty penalty;
+ ErrorTypes(Destination destination, Penalty penalty){
+ this.destination = destination;
+ this.penalty = penalty;
+ }
+
+ public Result result() {
+ return new Result(destination, penalty);
+ }
+
+ /**
+ * Represents the destination of input.
+ */
+ public enum Destination {
+ ProcessException, Failure, Retry, Self
+ }
+
+ /**
+ * Indicating yield or penalize the processing when transfer the input.
+ */
+ public enum Penalty {
+ Yield, Penalize, None
+ }
+
+ public Destination destination(){
+ return this.destination;
+ }
+
+ public Penalty penalty(){
+ return this.penalty;
+ }
+
+ /**
+ * Result represents a result of a procedure.
+ * ErrorTypes enum contains basic error result patterns.
+ */
+ public static class Result {
+ private final Destination destination;
+ private final Penalty penalty;
+
+ public Result(Destination destination, Penalty penalty) {
+ this.destination = destination;
+ this.penalty = penalty;
+ }
+
+ public Destination destination() {
+ return destination;
+ }
+
+ public Penalty penalty() {
+ return penalty;
+ }
+
+ @Override
+ public String toString() {
+ return "Result{" +
+ "destination=" + destination +
+ ", penalty=" + penalty +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Result result = (Result) o;
+
+ if (destination != result.destination) return false;
+ return penalty == result.penalty;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = destination != null ? destination.hashCode() : 0;
+ result = 31 * result + (penalty != null ? penalty.hashCode() : 0);
+ return result;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java
new file mode 100644
index 0000000..bd1c9eb
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.pattern.ErrorTypes.Result;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * <p>ExceptionHandler provides a structured Exception handling logic composed by reusable partial functions.
+ *
+ * <p>
+ * Benefits of using ExceptionHandler:
+ * <li>Externalized error handling code which provides cleaner program only focusing on the expected path.</li>
+ * <li>Classify specific Exceptions into {@link ErrorTypes}, consolidated error handling based on error type.</li>
+ * <li>Context aware error handling, {@link RollbackOnFailure} for instance.</li>
+ * </p>
+ */
+public class ExceptionHandler<C> {
+
+ @FunctionalInterface
+ public interface Procedure<I> {
+ void apply(I input) throws Exception;
+ }
+
+ public interface OnError<C, I> {
+ void apply(C context, I input, Result result, Exception e);
+
+ default OnError<C, I> andThen(OnError<C, I> after) {
+ return (c, i, r, e) -> {
+ apply(c, i, r, e);
+ after.apply(c, i, r, e);
+ };
+ }
+ }
+
+ /**
+ * Simply categorise an Exception.
+ */
+ private Function<Exception, ErrorTypes> mapException;
+
+ /**
+ * Adjust error type based on the context.
+ */
+ private BiFunction<C, ErrorTypes, Result> adjustError;
+
+ /**
+ * Do some action to the input based on the final error type.
+ */
+ private OnError<C, ?> onError;
+
+ /**
+ * Specify a function that maps an Exception to certain ErrorType.
+ */
+ public void mapException(Function<Exception, ErrorTypes> mapException) {
+ this.mapException = mapException;
+ }
+
+ /**
+ * <p>Specify a function that adjust ErrorType based on a function context.
+ * <p>For example, {@link RollbackOnFailure#createAdjustError(ComponentLog)} decides
+ * whether a process session should rollback or transfer input to failure or retry.
+ */
+ public void adjustError(BiFunction<C, ErrorTypes, Result> adjustError) {
+ this.adjustError = adjustError;
+ }
+
+ /**
+ * <p>Specify a default OnError function that will be called if one is not explicitly specified when {@link #execute(Object, Object, Procedure)} is called.
+ */
+ public void onError(OnError<C, ?> onError) {
+ this.onError = onError;
+ }
+
+ /**
+ * <p>Executes specified procedure function with the input.
+ * <p>Default OnError function will be called when an exception is thrown.
+ * @param context function context
+ * @param input input for procedure
+ * @param procedure a function that does something with the input
+ * @return True if the procedure finished without issue. False if procedure threw an Exception but it was handled by {@link OnError}.
+ * @throws ProcessException Thrown if the exception was not handled by {@link OnError}
+ * @throws DiscontinuedException Indicating the exception was handled by {@link OnError} but process should stop immediately
+ * without processing any further input
+ */
+ @SuppressWarnings("unchecked")
+ public <I> boolean execute(C context, I input, Procedure<I> procedure) throws ProcessException, DiscontinuedException {
+ return execute(context, input, procedure, (OnError<C, I>) onError);
+ }
+
+ /**
+ * <p>Executes specified procedure function with the input.
+ * @param context function context
+ * @param input input for procedure
+ * @param procedure a function that does something with the input
+ * @param onError specify {@link OnError} function for this execution
+ * @return True if the procedure finished without issue. False if procedure threw an Exception but it was handled by {@link OnError}.
+ * @throws ProcessException Thrown if the exception was not handled by {@link OnError}
+ * @throws DiscontinuedException Indicating the exception was handled by {@link OnError} but process should stop immediately
+ * without processing any further input
+ */
+ public <I> boolean execute(C context, I input, Procedure<I> procedure, OnError<C, I> onError) throws ProcessException, DiscontinuedException {
+ try {
+ procedure.apply(input);
+ return true;
+ } catch (Exception e) {
+
+ if (mapException == null) {
+ throw new ProcessException("An exception was thrown: " + e, e);
+ }
+
+ final ErrorTypes type = mapException.apply(e);
+
+ final Result result;
+ if (adjustError != null) {
+ result = adjustError.apply(context, type);
+ } else {
+ result = new Result(type.destination(), type.penalty());
+ }
+
+ if (onError == null) {
+ throw new IllegalStateException("OnError is not set.");
+ }
+
+ onError.apply(context, input, result, e);
+ }
+ return false;
+ }
+
+ private static FlowFile penalize(final ProcessContext context, final ProcessSession session,
+ final FlowFile flowFile, final ErrorTypes.Penalty penalty) {
+ switch (penalty) {
+ case Penalize:
+ return session.penalize(flowFile);
+ case Yield:
+ context.yield();
+ }
+ return flowFile;
+ }
+
+ /**
+ * Create a {@link OnError} function instance that routes input based on {@link Result} destination and penalty.
+ * @param context process context is used to yield a processor
+ * @param session process session is used to penalize a FlowFile
+ * @param routingResult input FlowFile will be routed to a destination relationship in this {@link RoutingResult}
+ * @param relFailure specify failure relationship of a processor
+ * @param relRetry specify retry relationship of a processor
+ * @return composed function
+ */
+ public static <C> ExceptionHandler.OnError<C, FlowFile> createOnError(
+ final ProcessContext context, final ProcessSession session, final RoutingResult routingResult,
+ final Relationship relFailure, final Relationship relRetry) {
+
+ return (fc, input, result, e) -> {
+ final PartialFunctions.FlowFileGroup flowFileGroup = () -> Collections.singletonList(input);
+ createOnGroupError(context, session, routingResult, relFailure, relRetry).apply(fc, flowFileGroup, result, e);
+ };
+ }
+
+ /**
+ * Same as {@link #createOnError(ProcessContext, ProcessSession, RoutingResult, Relationship, Relationship)} for FlowFileGroup.
+ * @param context process context is used to yield a processor
+ * @param session process session is used to penalize FlowFiles
+ * @param routingResult input FlowFiles will be routed to a destination relationship in this {@link RoutingResult}
+ * @param relFailure specify failure relationship of a processor
+ * @param relRetry specify retry relationship of a processor
+ * @return composed function
+ */
+ public static <C, I extends PartialFunctions.FlowFileGroup> ExceptionHandler.OnError<C, I> createOnGroupError(
+ final ProcessContext context, final ProcessSession session, final RoutingResult routingResult,
+ final Relationship relFailure, final Relationship relRetry) {
+ return (c, g, r, e) -> {
+ final Relationship routeTo;
+ switch (r.destination()) {
+ case Failure:
+ routeTo = relFailure;
+ break;
+ case Retry:
+ routeTo = relRetry;
+ break;
+ case Self:
+ routeTo = Relationship.SELF;
+ break;
+ default:
+ if (e instanceof ProcessException) {
+ throw (ProcessException)e;
+ } else {
+ Object inputs = null;
+ if (g != null) {
+ final List<FlowFile> flowFiles = g.getFlowFiles();
+ switch (flowFiles.size()) {
+ case 0:
+ inputs = "[]";
+ break;
+ case 1:
+ inputs = flowFiles.get(0);
+ break;
+ default:
+ inputs = String.format("%d FlowFiles including %s", flowFiles.size(), flowFiles.get(0));
+ break;
+ }
+ }
+ throw new ProcessException(String.format("Failed to process %s due to %s", inputs, e), e);
+ }
+ }
+ for (FlowFile f : g.getFlowFiles()) {
+ final FlowFile maybePenalized = penalize(context, session, f, r.penalty());
+ routingResult.routeTo(maybePenalized, routeTo);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
new file mode 100644
index 0000000..8332289
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+
+/**
+ * This class contains various partial functions those are reusable among process patterns.
+ */
+public class PartialFunctions {
+
+ @FunctionalInterface
+ public interface InitConnection<FC, C> {
+ C apply(ProcessContext context, ProcessSession session, FC functionContext) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface FetchFlowFiles<FC> {
+ List<FlowFile> apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface OnCompleted<FC, C> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface OnFailed<FC, C> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, Exception e) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface Cleanup<FC, C> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface FlowFileGroup {
+ List<FlowFile> getFlowFiles();
+ }
+
+ @FunctionalInterface
+ public interface AdjustRoute<FC> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface TransferFlowFiles<FC> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException;
+
+ default TransferFlowFiles<FC> andThen(TransferFlowFiles<FC> after) {
+ return (context, session, functionContext, result) -> {
+ apply(context, session, functionContext, result);
+ after.apply(context, session, functionContext, result);
+ };
+ }
+ }
+
+ public static <FCT> PartialFunctions.FetchFlowFiles<FCT> fetchSingleFlowFile() {
+ return (context, session, functionContext, result) -> session.get(1);
+ }
+
+ public static <FCT> PartialFunctions.TransferFlowFiles<FCT> transferRoutedFlowFiles() {
+ return (context, session, functionContext, result)
+ -> result.getRoutedFlowFiles().forEach(((relationship, routedFlowFiles)
+ -> session.transfer(routedFlowFiles, relationship)));
+ }
+
+ @FunctionalInterface
+ public interface OnTrigger {
+ void execute(ProcessSession session) throws ProcessException;
+ }
+
+ @FunctionalInterface
+ public interface RollbackSession {
+ void rollback(ProcessSession session, Throwable t);
+ }
+
+ /**
+ * <p>This method is identical to what {@link org.apache.nifi.processor.AbstractProcessor#onTrigger(ProcessContext, ProcessSession)} does.</p>
+ * <p>Create a session from ProcessSessionFactory and execute specified onTrigger function, and commit the session if onTrigger finishes successfully.</p>
+ * <p>When an Exception is thrown during execution of the onTrigger, the session will be rollback. FlowFiles being processed will be penalized.</p>
+ */
+ public static void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory, ComponentLog logger, OnTrigger onTrigger) throws ProcessException {
+ onTrigger(context, sessionFactory, logger, onTrigger, (session, t) -> session.rollback(true));
+ }
+
+ public static void onTrigger(
+ ProcessContext context, ProcessSessionFactory sessionFactory, ComponentLog logger, OnTrigger onTrigger,
+ RollbackSession rollbackSession) throws ProcessException {
+ final ProcessSession session = sessionFactory.createSession();
+ try {
+ onTrigger.execute(session);
+ session.commit();
+ } catch (final Throwable t) {
+ logger.error("{} failed to process due to {}; rolling back session", new Object[]{onTrigger, t});
+ rollbackSession.rollback(session, t);
+ throw t;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
new file mode 100644
index 0000000..790f48a
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract Put pattern class with a generic onTrigger method structure, composed with various partial functions.
+ * @param <FC> Class of context instance which is passed to each partial functions.
+ * Lifetime of an function context should be limited for a single onTrigger method.
+ * @param <C> Class of connection to a data storage that this pattern puts data into.
+ */
+public class Put<FC, C extends AutoCloseable> {
+ protected PartialFunctions.InitConnection<FC, C> initConnection;
+ protected PartialFunctions.FetchFlowFiles<FC> fetchFlowFiles = PartialFunctions.fetchSingleFlowFile();
+ protected PutFlowFile<FC, C> putFlowFile;
+ protected PartialFunctions.TransferFlowFiles<FC> transferFlowFiles = PartialFunctions.transferRoutedFlowFiles();
+ protected PartialFunctions.AdjustRoute<FC> adjustRoute;
+ protected PartialFunctions.OnCompleted<FC, C> onCompleted;
+ protected PartialFunctions.OnFailed<FC, C> onFailed;
+ protected PartialFunctions.Cleanup<FC, C> cleanup;
+ protected ComponentLog logger;
+
+ /**
+ * Put fetched FlowFiles to a data storage.
+ * @param context process context passed from a Processor onTrigger.
+ * @param session process session passed from a Processor onTrigger.
+ * @param functionContext function context passed from a Processor onTrigger.
+ * @param connection connection to data storage, established by {@link PartialFunctions.InitConnection}.
+ * @param flowFiles FlowFiles fetched from {@link PartialFunctions.FetchFlowFiles}.
+ * @param result Route incoming FlowFiles if necessary.
+ */
+ protected void putFlowFiles(ProcessContext context, ProcessSession session,
+ FC functionContext, C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException {
+ for (FlowFile flowFile : flowFiles) {
+ putFlowFile.apply(context, session, functionContext, connection, flowFile, result);
+ }
+ }
+
+ protected void validateCompositePattern() {
+ Objects.requireNonNull(initConnection, "InitConnection function is required.");
+ Objects.requireNonNull(putFlowFile, "PutFlowFile function is required.");
+ Objects.requireNonNull(transferFlowFiles, "TransferFlowFiles function is required.");
+ }
+
+ /**
+ * <p>Processor using this pattern is expected to call this method from its onTrigger.
+ * <p>Typical usage would be constructing a process pattern instance at a processor method
+ * which is annotated with {@link org.apache.nifi.annotation.lifecycle.OnScheduled},
+ * and use pattern.onTrigger from processor.onTrigger.
+ * <p>{@link PartialFunctions.InitConnection} is required at least. In addition to any functions required by an implementation class.
+ * @param context process context passed from a Processor onTrigger.
+ * @param session process session passed from a Processor onTrigger.
+ * @param functionContext function context should be instantiated per onTrigger call.
+ * @throws ProcessException Each partial function can throw ProcessException if onTrigger should stop immediately.
+ */
+ public void onTrigger(ProcessContext context, ProcessSession session, FC functionContext) throws ProcessException {
+
+ validateCompositePattern();
+
+ final RoutingResult result = new RoutingResult();
+ final List<FlowFile> flowFiles = fetchFlowFiles.apply(context, session, functionContext, result);
+
+ // Transfer FlowFiles if there is any.
+ result.getRoutedFlowFiles().forEach(((relationship, routedFlowFiles) ->
+ session.transfer(routedFlowFiles, relationship)));
+
+ if (flowFiles == null || flowFiles.isEmpty()) {
+ logger.debug("No incoming FlowFiles.");
+ return;
+ }
+
+ try (C connection = initConnection.apply(context, session, functionContext)) {
+
+ try {
+ // Execute the core function.
+ try {
+ putFlowFiles(context, session, functionContext, connection, flowFiles, result);
+ } catch (DiscontinuedException e) {
+ // Whether it was an error or semi normal is depends on the implementation and reason why it wanted to discontinue.
+ // So, no logging is needed here.
+ }
+
+ // Extension point to alter routes.
+ if (adjustRoute != null) {
+ adjustRoute.apply(context, session, functionContext, result);
+ }
+
+ // Put fetched, but unprocessed FlowFiles back to self.
+ final List<FlowFile> transferredFlowFiles = result.getRoutedFlowFiles().values().stream()
+ .flatMap(List::stream).collect(Collectors.toList());
+ final List<FlowFile> unprocessedFlowFiles = flowFiles.stream()
+ .filter(flowFile -> !transferredFlowFiles.contains(flowFile)).collect(Collectors.toList());
+ result.routeTo(unprocessedFlowFiles, Relationship.SELF);
+
+ // OnCompleted processing.
+ if (onCompleted != null) {
+ onCompleted.apply(context, session, functionContext, connection);
+ }
+
+ // Transfer FlowFiles.
+ transferFlowFiles.apply(context, session, functionContext, result);
+
+ } catch (Exception e) {
+ if (onFailed != null) {
+ onFailed.apply(context, session, functionContext, connection, e);
+ }
+ throw e;
+ } finally {
+ if (cleanup != null) {
+ cleanup.apply(context, session, functionContext, connection);
+ }
+ }
+
+ } catch (ProcessException e) {
+ throw e;
+ } catch (Exception e) {
+ // Throw uncaught exception as RuntimeException so that this processor will be yielded.
+ final String msg = String.format("Failed to execute due to %s", e);
+ logger.error(msg, e);
+ throw new RuntimeException(msg, e);
+ }
+
+ }
+
+ /**
+ * Specify an optional function that fetches incoming FlowFIles.
+ * If not specified, single FlowFile is fetched on each onTrigger.
+ * @param f Function to fetch incoming FlowFiles.
+ */
+ public void fetchFlowFiles(PartialFunctions.FetchFlowFiles<FC> f) {
+ fetchFlowFiles = f;
+ }
+
+ /**
+ * Specify a function that establishes a connection to target data storage.
+ * This function will be called when there is valid incoming FlowFiles.
+ * The created connection instance is automatically closed when onTrigger is finished.
+ * @param f Function to initiate a connection to a data storage.
+ */
+ public void initConnection(PartialFunctions.InitConnection<FC, C> f) {
+ initConnection = f;
+ }
+
+ /**
+ * Specify a function that puts an incoming FlowFile to target data storage.
+ * @param f a function to put a FlowFile to target storage.
+ */
+ public void putFlowFile(PutFlowFile<FC, C> f) {
+ this.putFlowFile = f;
+ }
+
+ /**
+ * Specify an optional function that adjust routed FlowFiles before transfer it.
+ * @param f a function to adjust route.
+ */
+ public void adjustRoute(PartialFunctions.AdjustRoute<FC> f) {
+ this.adjustRoute = f;
+ }
+
+ /**
+ * Specify an optional function responsible for transferring routed FlowFiles.
+ * If not specified routed FlowFiles are simply transferred to its destination by default.
+ * @param f a function to transfer routed FlowFiles.
+ */
+ public void transferFlowFiles(PartialFunctions.TransferFlowFiles<FC> f) {
+ this.transferFlowFiles = f;
+ }
+
+ /**
+ * Specify an optional function which will be called if input FlowFiles were successfully put to a target storage.
+ * @param f Function to be called when a put operation finishes successfully.
+ */
+ public void onCompleted(PartialFunctions.OnCompleted<FC, C> f) {
+ onCompleted = f;
+ }
+
+ /**
+ * Specify an optional function which will be called if input FlowFiles failed being put to a target storage.
+ * @param f Function to be called when a put operation failed.
+ */
+ public void onFailed(PartialFunctions.OnFailed<FC, C> f) {
+ onFailed = f;
+ }
+
+ /**
+ * Specify an optional function which will be called in a finally block.
+ * Typically useful when a special cleanup operation is needed for the connection.
+ * @param f Function to be called when a put operation finished regardless of whether it succeeded or not.
+ */
+ public void cleanup(PartialFunctions.Cleanup<FC, C> f) {
+ cleanup = f;
+ }
+
+ public void setLogger(ComponentLog logger) {
+ this.logger = logger;
+ }
+
+ @FunctionalInterface
+ public interface PutFlowFile<FC, C> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection,
+ FlowFile flowFile, RoutingResult result) throws ProcessException;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java
new file mode 100644
index 0000000..6e9da2e
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Extended Put pattern capable of handling FlowFile groups.
+ * @param <FC> Function context class.
+ * @param <C> Connection class.
+ * @param <FFG> FlowFileGroup class.
+ */
+public class PutGroup<FC, C extends AutoCloseable, FFG extends PartialFunctions.FlowFileGroup> extends Put<FC, C> {
+
+
+ public PutGroup() {
+ // Just to make a composition valid.
+ this.putFlowFile = (context, session, functionContext, connection, inputFlowFile, result) -> {
+ throw new UnsupportedOperationException();
+ };
+ }
+
+ @FunctionalInterface
+ public interface PutFlowFiles<FC, C, FFG> {
+ void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection,
+ FFG inputFlowFileGroup, RoutingResult result) throws ProcessException;
+ }
+
+ @Override
+ protected void validateCompositePattern() {
+ super.validateCompositePattern();
+ Objects.requireNonNull(groupFlowFiles, "GroupFlowFiles function is required.");
+ }
+
+ /**
+ * PutGroup does not support PutFileFile function for single FlowFile.
+ * Throws UnsupportedOperationException if called.
+ */
+ @Override
+ public void putFlowFile(PutFlowFile<FC, C> putFlowFile) {
+ throw new UnsupportedOperationException("PutFlowFile can not be used with PutGroup pattern. Specify PutFlowFiles instead.");
+ }
+
+ @FunctionalInterface
+ public interface GroupFlowFiles<FC, C, FFG> {
+ List<FFG> apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException;
+ }
+
+ private GroupFlowFiles<FC, C, FFG> groupFlowFiles;
+ private PutFlowFiles<FC, C, FFG> putFlowFiles;
+
+ /**
+ * Specify a function that groups input FlowFiles into FlowFile groups.
+ */
+ public void groupFetchedFlowFiles(GroupFlowFiles<FC, C, FFG> f) {
+ groupFlowFiles = f;
+ }
+
+ /**
+ * Specify a function that puts an input FlowFile group to a target storage using a given connection.
+ */
+ public void putFlowFiles(PutFlowFiles<FC, C, FFG> f) {
+ putFlowFiles = f;
+ }
+
+
+ @Override
+ protected void putFlowFiles(ProcessContext context, ProcessSession session, FC functionContext,
+ C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException {
+ final List<FFG> flowFileGroups = groupFlowFiles
+ .apply(context, session, functionContext, connection, flowFiles, result);
+
+ for (FFG group : flowFileGroups) {
+ putFlowFiles.apply(context, session, functionContext, connection, group, result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java
new file mode 100644
index 0000000..2d4d768
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.AdjustRoute;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * <p>RollbackOnFailure can be used as a function context for process patterns such as {@link Put} to provide a configurable error handling.
+ *
+ * <p>
+ * RollbackOnFailure can add following characteristics to a processor:
+ * <li>When disabled, input FlowFiles caused an error will be routed to 'failure' or 'retry' relationship, based on the type of error.</li>
+ * <li>When enabled, input FlowFiles are kept in the input queue. A ProcessException is thrown to rollback the process session.</li>
+ * <li>It assumes anything happened during a processors onTrigger can rollback, if this is marked as transactional.</li>
+ * <li>If transactional and enabled, even if some FlowFiles are already processed, it rollbacks the session when error occurs.</li>
+ * <li>If not transactional and enabled, it only rollbacks the session when error occurs only if there was no progress.</li>
+ * </p>
+ *
+ * <p>There are two approaches to apply RollbackOnFailure. One is using {@link ExceptionHandler#adjustError(BiFunction)},
+ * and the other is implementing processor onTrigger using process patterns such as {@link Put#adjustRoute(AdjustRoute)}. </p>
+ *
+ * <p>It's also possible to use both approaches. ExceptionHandler can apply when an Exception is thrown immediately, while AdjustRoute respond later but requires less code.</p>
+ */
+public class RollbackOnFailure {
+
+ private final boolean rollbackOnFailure;
+ private final boolean transactional;
+ private boolean discontinue;
+
+ private int processedCount = 0;
+
+ /**
+ * Constructor.
+ * @param rollbackOnFailure Should be set by user via processor configuration.
+ * @param transactional Specify whether a processor is transactional.
+ * If not, it is important to call {@link #proceed()} after successful execution of processors task,
+ * that indicates processor made an operation that can not be undone.
+ */
+ public RollbackOnFailure(boolean rollbackOnFailure, boolean transactional) {
+ this.rollbackOnFailure = rollbackOnFailure;
+ this.transactional = transactional;
+ }
+
+ public static final PropertyDescriptor ROLLBACK_ON_FAILURE = createRollbackOnFailureProperty("");
+
+ public static PropertyDescriptor createRollbackOnFailureProperty(String additionalDescription) {
+ return new PropertyDescriptor.Builder()
+ .name("rollback-on-failure")
+ .displayName("Rollback On Failure")
+ .description("Specify how to handle error." +
+ " By default (false), if an error occurs while processing a FlowFile, the FlowFile will be routed to" +
+ " 'failure' or 'retry' relationship based on error type, and processor can continue with next FlowFile." +
+ " Instead, you may want to rollback currently processed FlowFiles and stop further processing immediately." +
+ " In that case, you can do so by enabling this 'Rollback On Failure' property. " +
+ " If enabled, failed FlowFiles will stay in the input relationship without penalizing it and being processed repeatedly" +
+ " until it gets processed successfully or removed by other means." +
+ " It is important to set adequate 'Yield Duration' to avoid retrying too frequently." + additionalDescription)
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("false")
+ .required(true)
+ .build();
+ }
+
+ /**
+ * Create a function to use with {@link ExceptionHandler} that adjust error type based on functional context.
+ */
+ public static <FCT extends RollbackOnFailure> BiFunction<FCT, ErrorTypes, ErrorTypes.Result> createAdjustError(final ComponentLog logger) {
+ return (c, t) -> {
+
+ ErrorTypes.Result adjusted = null;
+ switch (t.destination()) {
+
+ case ProcessException:
+ // If this process can rollback, then rollback it.
+ if (!c.canRollback()) {
+ // If an exception is thrown but the processor is not transactional and processed count > 0, adjust it to self,
+ // in order to stop any further processing until this input is processed successfully.
+ // If we throw an Exception in this state, the already succeeded FlowFiles will be rolled back, too.
+ // In case the progress was made by other preceding inputs,
+ // those successful inputs should be sent to 'success' and this input stays in incoming queue.
+ // In case this input made some progress to external system, the partial update will be replayed again,
+ // can cause duplicated data.
+ c.discontinue();
+ // We should not penalize a FlowFile, if we did, other FlowFiles can be fetched first.
+ // We need to block others to be processed until this one finishes.
+ adjusted = new ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield);
+ }
+ break;
+
+ case Failure:
+ case Retry:
+ if (c.isRollbackOnFailure()) {
+ c.discontinue();
+ if (c.canRollback()) {
+ // If this process can rollback, then throw ProcessException instead, in order to rollback.
+ adjusted = new ErrorTypes.Result(ErrorTypes.Destination.ProcessException, ErrorTypes.Penalty.Yield);
+ } else {
+ // If not,
+ adjusted = new ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield);
+ }
+ }
+ break;
+ }
+
+ if (adjusted != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adjusted {} to {} based on context rollbackOnFailure={}, processedCount={}, transactional={}",
+ new Object[]{t, adjusted, c.isRollbackOnFailure(), c.getProcessedCount(), c.isTransactional()});
+ }
+ return adjusted;
+ }
+
+ return t.result();
+ };
+ }
+
+ /**
+ * Create an {@link AdjustRoute} function to use with process pattern such as {@link Put} that adjust routed FlowFiles based on context.
+ * This function works as a safety net by covering cases that Processor implementation did not use ExceptionHandler and transfer FlowFiles
+ * without considering RollbackOnFailure context.
+ */
+ public static <FCT extends RollbackOnFailure> AdjustRoute<FCT> createAdjustRoute(Relationship ... failureRelationships) {
+ return (context, session, fc, result) -> {
+ if (fc.isRollbackOnFailure()) {
+ // Check if route contains failure relationship.
+ for (Relationship failureRelationship : failureRelationships) {
+ if (!result.contains(failureRelationship)) {
+ continue;
+ }
+ if (fc.canRollback()) {
+ throw new ProcessException(String.format(
+ "A FlowFile is routed to %s. Rollback session based on context rollbackOnFailure=%s, processedCount=%d, transactional=%s",
+ failureRelationship.getName(), fc.isRollbackOnFailure(), fc.getProcessedCount(), fc.isTransactional()));
+ } else {
+ // Send failed FlowFiles to self.
+ final Map<Relationship, List<FlowFile>> routedFlowFiles = result.getRoutedFlowFiles();
+ final List<FlowFile> failedFlowFiles = routedFlowFiles.remove(failureRelationship);
+ result.routeTo(failedFlowFiles, Relationship.SELF);
+ }
+ }
+ }
+ };
+ }
+
+ public static <FCT extends RollbackOnFailure, I> ExceptionHandler.OnError<FCT, I> createOnError(ExceptionHandler.OnError<FCT, I> onError) {
+ return onError.andThen((context, input, result, e) -> {
+ if (context.shouldDiscontinue()) {
+ throw new DiscontinuedException("Discontinue processing due to " + e, e);
+ }
+ });
+ }
+
+ public static <FCT extends RollbackOnFailure> void onTrigger(
+ ProcessContext context, ProcessSessionFactory sessionFactory, FCT functionContext, ComponentLog logger,
+ PartialFunctions.OnTrigger onTrigger) throws ProcessException {
+
+ PartialFunctions.onTrigger(context, sessionFactory, logger, onTrigger, (session, t) -> {
+ // If RollbackOnFailure is enabled, do not penalize processing FlowFiles when rollback,
+ // in order to keep those in the incoming relationship to be processed again.
+ final boolean shouldPenalize = !functionContext.isRollbackOnFailure();
+ session.rollback(shouldPenalize);
+
+ // However, keeping failed FlowFile in the incoming relationship would retry it too often.
+ // So, administratively yield the process.
+ if (functionContext.isRollbackOnFailure()) {
+ logger.warn("Administratively yielding {} after rolling back due to {}", new Object[]{context.getName(), t}, t);
+ context.yield();
+ }
+ });
+ }
+
+ public int proceed() {
+ return ++processedCount;
+ }
+
+ public int getProcessedCount() {
+ return processedCount;
+ }
+
+ public boolean isRollbackOnFailure() {
+ return rollbackOnFailure;
+ }
+
+ public boolean isTransactional() {
+ return transactional;
+ }
+
+ public boolean canRollback() {
+ return transactional || processedCount == 0;
+ }
+
+ public boolean shouldDiscontinue() {
+ return discontinue;
+ }
+
+ public void discontinue() {
+ this.discontinue = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java
new file mode 100644
index 0000000..200d893
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RoutingResult {
+
+ private final Map<Relationship, List<FlowFile>> routedFlowFiles = new HashMap<>();
+
+ public void routeTo(final FlowFile flowFile, final Relationship relationship) {
+ routedFlowFiles.computeIfAbsent(relationship, r -> new ArrayList<>()).add(flowFile);
+ }
+
+ public void routeTo(final List<FlowFile> flowFiles, final Relationship relationship) {
+ routedFlowFiles.computeIfAbsent(relationship, r -> new ArrayList<>()).addAll(flowFiles);
+ }
+
+ public void merge(final RoutingResult r) {
+ r.getRoutedFlowFiles().forEach((relationship, routedFlowFiles) -> routeTo(routedFlowFiles, relationship));
+ }
+
+ public Map<Relationship, List<FlowFile>> getRoutedFlowFiles() {
+ return routedFlowFiles;
+ }
+
+ public boolean contains(Relationship relationship) {
+ return routedFlowFiles.containsKey(relationship) && !routedFlowFiles.get(relationship).isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java
new file mode 100644
index 0000000..bd73379
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.processor.exception.ProcessException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestExceptionHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestExceptionHandler.class);
+
+ /**
+ * Simulate an external procedure.
+ */
+ static class ExternalProcedure {
+ private boolean available = true;
+ int divide(Integer a, Integer b) throws Exception {
+ if (!available) {
+ throw new IOException("Not available");
+ }
+ if (a == 10) {
+ throw new IllegalStateException("Service for 10 is not currently available.");
+ }
+ return a / b;
+ }
+ }
+
+ private class Context {
+ int count = 0;
+ }
+
+ @Test
+ public void testBasicUsage() {
+
+ final ExternalProcedure p = new ExternalProcedure();
+
+ try {
+ // Although a catch-exception has to be caught each possible call,
+ // usually the error handling logic will be the same.
+ // Ends up having a lot of same code.
+ final int r1 = p.divide(4, 2);
+ assertEquals(2, r1);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ final Context context = new Context();
+ final ExceptionHandler<Context> handler = new ExceptionHandler<>();
+
+ // Using handler can avoid the try catch block with reusable error handling logic.
+ handler.execute(context, 6, i -> {
+ final int r2 = p.divide(i, 2);
+ assertEquals(3, r2);
+ });
+
+ // If return value is needed, use AtomicReference.
+ AtomicReference<Integer> r = new AtomicReference<>();
+ handler.execute(context, 8, i -> r.set(p.divide(i, 2)));
+ assertEquals(4, r.get().intValue());
+
+ // If no exception mapping is specified, any Exception thrown is wrapped by ProcessException.
+ try {
+ final Integer nullInput = null;
+ handler.execute(context, nullInput, i -> r.set(p.divide(i, 2)));
+ fail("Exception should be thrown because input is null.");
+ } catch (ProcessException e) {
+ assertTrue(e.getCause() instanceof NullPointerException);
+ }
+ }
+
+ // Reusable Exception mapping function.
+ static Function<Exception, ErrorTypes> exceptionMapping = i -> {
+ try {
+ throw i;
+ } catch (NullPointerException | ArithmeticException | NumberFormatException e) {
+ return ErrorTypes.InvalidInput;
+ } catch (IllegalStateException e) {
+ return ErrorTypes.TemporalInputFailure;
+ } catch (IOException e) {
+ return ErrorTypes.TemporalFailure;
+ } catch (Exception e) {
+ throw new ProcessException(e);
+ }
+ };
+
+ @Test
+ public void testHandling() {
+
+ final ExternalProcedure p = new ExternalProcedure();
+ final Context context = new Context();
+
+ final ExceptionHandler<Context> handler = new ExceptionHandler<>();
+ handler.mapException(exceptionMapping);
+ handler.onError(createInputErrorHandler());
+
+ // Benefit of handler is being able to externalize error handling, make it simpler.
+ handler.execute(context, 4, i -> {
+ final int r = p.divide(i, 2);
+ assertEquals(2, r);
+ });
+
+ // Null pointer exception.
+ final Integer input = null;
+ handler.execute(context, input, i -> {
+ p.divide(i, 2);
+ fail("Shouldn't reach here.");
+ });
+
+ // Divide by zero.
+ handler.execute(context, 0, i -> {
+ p.divide(2, i);
+ fail("Shouldn't reach here.");
+ });
+
+
+ }
+
+ static <C> ExceptionHandler.OnError<C, Integer> createInputErrorHandler() {
+ return (c, i, r, e) -> {
+ switch (r.destination()) {
+ case ProcessException:
+ throw new ProcessException(String.format("Execution failed due to %s", e), e);
+ default:
+ logger.warn(String.format("Routing to %s: %d caused %s", r, i, e));
+ }
+ };
+ }
+
+ static <C> ExceptionHandler.OnError<C, Integer[]> createArrayInputErrorHandler() {
+ return (c, i, r, e) -> {
+ switch (r.destination()) {
+ case ProcessException:
+ throw new ProcessException(String.format("Execution failed due to %s", e), e);
+ default:
+ logger.warn(String.format("Routing to %s: %d, %d caused %s", r, i[0], i[1], e));
+ }
+ };
+ }
+
+ @Test
+ public void testHandlingLoop() {
+
+ final ExternalProcedure p = new ExternalProcedure();
+ final Context context = new Context();
+
+ final ExceptionHandler<Context> handler = new ExceptionHandler<>();
+ handler.mapException(exceptionMapping);
+ handler.onError(createArrayInputErrorHandler());
+
+ // It's especially handy when looping through inputs. [a, b, expected result]
+ Integer[][] inputs = new Integer[][]{{4, 2, 2}, {null, 2, 999}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}};
+
+ Arrays.stream(inputs).forEach(input -> handler.execute(context, input, (in) -> {
+ final Integer r = p.divide(in[0], in[1]);
+ // This is safe because if p.divide throws error, this code won't be executed.
+ assertEquals(in[2], r);
+ }));
+
+ AtomicReference<Integer> r = new AtomicReference<>();
+ for (Integer[] input : inputs) {
+
+ if (!handler.execute(context, input, (in) -> {
+ r.set(p.divide(in[0], in[1]));
+ context.count++;
+ })){
+ // Handler returns false when it fails.
+ // Cleaner if-exception-continue-next-input can be written cleaner.
+ continue;
+ }
+
+ assertEquals(input[2], r.get());
+ }
+
+ assertEquals("Successful inputs", 2, context.count);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java
new file mode 100644
index 0000000..6d73759
--- /dev/null
+++ b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.pattern.TestExceptionHandler.ExternalProcedure;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.nifi.processor.util.pattern.TestExceptionHandler.createArrayInputErrorHandler;
+import static org.apache.nifi.processor.util.pattern.TestExceptionHandler.exceptionMapping;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestRollbackOnFailure {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestRollbackOnFailure.class);
+
+ /**
+ * This can be an example for how to compose an ExceptionHandler instance by reusable functions.
+ * @param logger used to log messages within functions
+ * @return a composed ExceptionHandler
+ */
+ private ExceptionHandler<RollbackOnFailure> getContextAwareExceptionHandler(ComponentLog logger) {
+ final ExceptionHandler<RollbackOnFailure> handler = new ExceptionHandler<>();
+ handler.mapException(exceptionMapping);
+ handler.adjustError(RollbackOnFailure.createAdjustError(logger));
+ handler.onError(createArrayInputErrorHandler());
+ return handler;
+ }
+
+ private void processInputs(RollbackOnFailure context, Integer[][] inputs, List<Integer> results) {
+ final ExternalProcedure p = new ExternalProcedure();
+ final MockComponentLog componentLog = new MockComponentLog("processor-id", this);
+ final ExceptionHandler<RollbackOnFailure> handler = getContextAwareExceptionHandler(componentLog);
+
+ for (Integer[] input : inputs) {
+
+ if (!handler.execute(context, input, (in) -> {
+ results.add(p.divide(in[0], in[1]));
+ context.proceed();
+ })){
+ continue;
+ }
+
+ assertEquals(input[2], results.get(results.size() - 1));
+ }
+ }
+
+ @Test
+ public void testContextDefaultBehavior() {
+
+ // Disabling rollbackOnFailure would route Failure or Retry as they are.
+ final RollbackOnFailure context = new RollbackOnFailure(false, false);
+
+ Integer[][] inputs = new Integer[][]{{null, 2, 999}, {4, 2, 2}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}};
+
+ final List<Integer> results = new ArrayList<>();
+ try {
+ processInputs(context, inputs, results);
+ } catch (ProcessException e) {
+ fail("ProcessException should NOT be thrown");
+ }
+
+ assertEquals("Successful inputs", 2, context.getProcessedCount());
+ }
+
+ @Test
+ public void testContextRollbackOnFailureNonTransactionalFirstFailure() {
+
+ final RollbackOnFailure context = new RollbackOnFailure(true, false);
+
+ // If the first execution fails without any succeeded inputs, it should throw a ProcessException.
+ Integer[][] inputs = new Integer[][]{{null, 2, 999}, {4, 2, 2}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}};
+
+ final List<Integer> results = new ArrayList<>();
+ try {
+ processInputs(context, inputs, results);
+ fail("ProcessException should be thrown");
+ } catch (ProcessException e) {
+ logger.info("Exception was thrown as expected.");
+ }
+
+ assertEquals("Successful inputs", 0, context.getProcessedCount());
+ }
+
+ @Test
+ public void testContextRollbackOnFailureNonTransactionalAlreadySucceeded() {
+
+ final RollbackOnFailure context = new RollbackOnFailure(true, false);
+
+ // If an execution fails after succeeded inputs, it transfer the input to Failure instead of ProcessException,
+ // and keep going. Because the external system does not support transaction.
+ Integer[][] inputs = new Integer[][]{{4, 2, 2}, {2, 0, 999}, {null, 2, 999}, {10, 2, 999}, {8, 2, 4}};
+
+ final List<Integer> results = new ArrayList<>();
+ try {
+ processInputs(context, inputs, results);
+ } catch (ProcessException e) {
+ fail("ProcessException should NOT be thrown");
+ }
+
+ assertEquals("Successful inputs", 2, context.getProcessedCount());
+ }
+
+ @Test
+ public void testContextRollbackOnFailureTransactionalAlreadySucceeded() {
+
+ final RollbackOnFailure context = new RollbackOnFailure(true, true);
+
+ // Even if an execution fails after succeeded inputs, it transfer the input to Failure,
+ // because the external system supports transaction.
+ Integer[][] inputs = new Integer[][]{{4, 2, 2}, {2, 0, 999}, {null, 2, 999}, {10, 2, 999}, {8, 2, 4}};
+
+ final List<Integer> results = new ArrayList<>();
+ try {
+ processInputs(context, inputs, results);
+ fail("ProcessException should be thrown");
+ } catch (ProcessException e) {
+ logger.info("Exception was thrown as expected.");
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
index ea6e5df..661180e 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
@@ -32,6 +32,10 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-processor-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<scope>provided</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java
index 3835ff7..1a2110a 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java
@@ -19,9 +19,8 @@ package org.apache.nifi.processors.hive;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.hive.HiveDBCPService;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
@@ -30,6 +29,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.charset.Charset;
+import java.sql.SQLDataException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Date;
@@ -45,7 +45,7 @@ import java.util.regex.Pattern;
/**
* An abstract base class for HiveQL processors to share common data, methods, etc.
*/
-public abstract class AbstractHiveQLProcessor extends AbstractProcessor {
+public abstract class AbstractHiveQLProcessor extends AbstractSessionFactoryProcessor {
protected static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("hiveql\\.args\\.(\\d+)\\.type");
protected static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
@@ -112,7 +112,7 @@ public abstract class AbstractHiveQLProcessor extends AbstractProcessor {
if (parameterIndex >= base && parameterIndex < base + paramCount) {
final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches();
if (!isNumeric) {
- throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral jdbcType");
+ throw new SQLDataException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral jdbcType");
}
final String valueAttrName = "hiveql.args." + parameterIndex + ".value";
@@ -139,7 +139,7 @@ public abstract class AbstractHiveQLProcessor extends AbstractProcessor {
try {
setParameter(stmt, ph.attributeName, index, ph.value, ph.jdbcType);
} catch (final NumberFormatException nfe) {
- throw new ProcessException("The value of the " + ph.attributeName + " is '" + ph.value + "', which cannot be converted into the necessary data jdbcType", nfe);
+ throw new SQLDataException("The value of the " + ph.attributeName + " is '" + ph.value + "', which cannot be converted into the necessary data jdbcType", nfe);
}
}
return base + paramCount;