You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/05/01 20:12:02 UTC

[09/17] nifi git commit: NIFI-3724 - Initial commit of Parquet bundle with PutParquet and FetchParquet - Creating nifi-records-utils to share utility code from record services - Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter - R

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java
new file mode 100644
index 0000000..bd1c9eb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.pattern.ErrorTypes.Result;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * <p>ExceptionHandler provides a structured Exception handling logic composed by reusable partial functions.
+ *
+ * <p>
+ *     Benefits of using ExceptionHandler:
+ *     <li>Externalized error handling code which provides cleaner program only focusing on the expected path.</li>
+ *     <li>Classify specific Exceptions into {@link ErrorTypes}, consolidated error handling based on error type.</li>
+ *     <li>Context aware error handling, {@link RollbackOnFailure} for instance.</li>
+ * </p>
+ */
+public class ExceptionHandler<C> {
+
+    @FunctionalInterface
+    public interface Procedure<I> {
+        void apply(I input) throws Exception;
+    }
+
+    public interface OnError<C, I> {
+        void apply(C context, I input, Result result, Exception e);
+
+        default OnError<C, I> andThen(OnError<C, I> after) {
+            return (c, i, r, e) -> {
+                apply(c, i, r, e);
+                after.apply(c, i, r, e);
+            };
+        }
+    }
+
+    /**
+     * Simply categorise an Exception.
+     */
+    private Function<Exception, ErrorTypes> mapException;
+
+    /**
+     * Adjust error type based on the context.
+     */
+    private BiFunction<C, ErrorTypes, Result> adjustError;
+
+    /**
+     * Do some action to the input based on the final error type.
+     */
+    private OnError<C, ?> onError;
+
+    /**
+     * Specify a function that maps an Exception to certain ErrorType.
+     */
+    public void mapException(Function<Exception, ErrorTypes> mapException) {
+        this.mapException = mapException;
+    }
+
+    /**
+     * <p>Specify a function that adjust ErrorType based on a function context.
+     * <p>For example, {@link RollbackOnFailure#createAdjustError(ComponentLog)} decides
+     * whether a process session should rollback or transfer input to failure or retry.
+     */
+    public void adjustError(BiFunction<C, ErrorTypes, Result> adjustError) {
+        this.adjustError = adjustError;
+    }
+
+    /**
+     * <p>Specify a default OnError function that will be called if one is not explicitly specified when {@link #execute(Object, Object, Procedure)} is called.
+     */
+    public void onError(OnError<C, ?> onError) {
+        this.onError = onError;
+    }
+
+    /**
+     * <p>Executes specified procedure function with the input.
+     * <p>Default OnError function will be called when an exception is thrown.
+     * @param context function context
+     * @param input input for procedure
+     * @param procedure a function that does something with the input
+     * @return True if the procedure finished without issue. False if procedure threw an Exception but it was handled by {@link OnError}.
+     * @throws ProcessException Thrown if the exception was not handled by {@link OnError}
+     * @throws DiscontinuedException Indicating the exception was handled by {@link OnError} but process should stop immediately
+     * without processing any further input
+     */
+    @SuppressWarnings("unchecked")
+    public <I> boolean execute(C context, I input, Procedure<I> procedure) throws ProcessException, DiscontinuedException {
+        return execute(context, input, procedure, (OnError<C, I>) onError);
+    }
+
+    /**
+     * <p>Executes specified procedure function with the input.
+     * @param context function context
+     * @param input input for procedure
+     * @param procedure a function that does something with the input
+     * @param onError specify {@link OnError} function for this execution
+     * @return True if the procedure finished without issue. False if procedure threw an Exception but it was handled by {@link OnError}.
+     * @throws ProcessException Thrown if the exception was not handled by {@link OnError}
+     * @throws DiscontinuedException Indicating the exception was handled by {@link OnError} but process should stop immediately
+     * without processing any further input
+     */
+    public <I> boolean execute(C context, I input, Procedure<I> procedure, OnError<C, I> onError) throws ProcessException, DiscontinuedException {
+        try {
+            procedure.apply(input);
+            return true;
+        } catch (Exception e) {
+
+            if (mapException == null) {
+                throw new ProcessException("An exception was thrown: " + e, e);
+            }
+
+            final ErrorTypes type = mapException.apply(e);
+
+            final Result result;
+            if (adjustError != null) {
+                result = adjustError.apply(context, type);
+            } else {
+                result = new Result(type.destination(), type.penalty());
+            }
+
+            if (onError == null) {
+                throw new IllegalStateException("OnError is not set.");
+            }
+
+            onError.apply(context, input, result, e);
+        }
+        return false;
+    }
+
+    private static FlowFile penalize(final ProcessContext context, final ProcessSession session,
+                                     final FlowFile flowFile, final ErrorTypes.Penalty penalty) {
+        switch (penalty) {
+            case Penalize:
+                return session.penalize(flowFile);
+            case Yield:
+                context.yield();
+        }
+        return flowFile;
+    }
+
+    /**
+     * Create a {@link OnError} function instance that routes input based on {@link Result} destination and penalty.
+     * @param context process context is used to yield a processor
+     * @param session process session is used to penalize a FlowFile
+     * @param routingResult input FlowFile will be routed to a destination relationship in this {@link RoutingResult}
+     * @param relFailure specify failure relationship of a processor
+     * @param relRetry specify retry relationship of a processor
+     * @return composed function
+     */
+    public static <C> ExceptionHandler.OnError<C, FlowFile> createOnError(
+            final ProcessContext context, final ProcessSession session, final RoutingResult routingResult,
+            final Relationship relFailure, final Relationship relRetry) {
+
+        return (fc, input, result, e) -> {
+            final PartialFunctions.FlowFileGroup flowFileGroup = () -> Collections.singletonList(input);
+            createOnGroupError(context, session, routingResult, relFailure, relRetry).apply(fc, flowFileGroup, result, e);
+        };
+    }
+
+    /**
+     * Same as {@link #createOnError(ProcessContext, ProcessSession, RoutingResult, Relationship, Relationship)} for FlowFileGroup.
+     * @param context process context is used to yield a processor
+     * @param session process session is used to penalize FlowFiles
+     * @param routingResult input FlowFiles will be routed to a destination relationship in this {@link RoutingResult}
+     * @param relFailure specify failure relationship of a processor
+     * @param relRetry specify retry relationship of a processor
+     * @return composed function
+     */
+    public static <C, I extends PartialFunctions.FlowFileGroup> ExceptionHandler.OnError<C, I> createOnGroupError(
+            final ProcessContext context, final ProcessSession session, final RoutingResult routingResult,
+            final Relationship relFailure, final Relationship relRetry) {
+        return (c, g, r, e) -> {
+            final Relationship routeTo;
+            switch (r.destination()) {
+                case Failure:
+                    routeTo = relFailure;
+                    break;
+                case Retry:
+                    routeTo = relRetry;
+                    break;
+                case Self:
+                    routeTo = Relationship.SELF;
+                    break;
+                default:
+                    if (e instanceof ProcessException) {
+                        throw (ProcessException)e;
+                    } else {
+                        Object inputs = null;
+                        if (g != null) {
+                            final List<FlowFile> flowFiles = g.getFlowFiles();
+                            switch (flowFiles.size()) {
+                                case 0:
+                                    inputs = "[]";
+                                    break;
+                                case 1:
+                                    inputs = flowFiles.get(0);
+                                    break;
+                                default:
+                                    inputs = String.format("%d FlowFiles including %s", flowFiles.size(), flowFiles.get(0));
+                                    break;
+                            }
+                        }
+                        throw new ProcessException(String.format("Failed to process %s due to %s", inputs, e), e);
+                    }
+            }
+            for (FlowFile f : g.getFlowFiles()) {
+                final FlowFile maybePenalized = penalize(context, session, f, r.penalty());
+                routingResult.routeTo(maybePenalized, routeTo);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
new file mode 100644
index 0000000..8332289
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+
+/**
+ * This class contains various partial functions those are reusable among process patterns.
+ */
+public class PartialFunctions {
+
+    @FunctionalInterface
+    public interface InitConnection<FC, C> {
+        C apply(ProcessContext context, ProcessSession session, FC functionContext) throws ProcessException;
+    }
+
+    @FunctionalInterface
+    public interface FetchFlowFiles<FC> {
+        List<FlowFile> apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException;
+    }
+
+    @FunctionalInterface
+    public interface OnCompleted<FC, C> {
+        void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection) throws ProcessException;
+    }
+
+    @FunctionalInterface
+    public interface OnFailed<FC, C> {
+        void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, Exception e) throws ProcessException;
+    }
+
+    @FunctionalInterface
+    public interface Cleanup<FC, C> {
+        void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection) throws ProcessException;
+    }
+
+    @FunctionalInterface
+    public interface FlowFileGroup {
+        List<FlowFile> getFlowFiles();
+    }
+
+    @FunctionalInterface
+    public interface AdjustRoute<FC> {
+        void apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException;
+    }
+
+    @FunctionalInterface
+    public interface TransferFlowFiles<FC> {
+        void apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException;
+
+        default TransferFlowFiles<FC> andThen(TransferFlowFiles<FC> after) {
+            return (context, session, functionContext, result) -> {
+                apply(context, session, functionContext, result);
+                after.apply(context, session, functionContext, result);
+            };
+        }
+    }
+
+    public static <FCT> PartialFunctions.FetchFlowFiles<FCT> fetchSingleFlowFile() {
+        return (context, session, functionContext, result) -> session.get(1);
+    }
+
+    public static <FCT> PartialFunctions.TransferFlowFiles<FCT> transferRoutedFlowFiles() {
+        return (context, session, functionContext, result)
+                -> result.getRoutedFlowFiles().forEach(((relationship, routedFlowFiles)
+                -> session.transfer(routedFlowFiles, relationship)));
+    }
+
+    @FunctionalInterface
+    public interface OnTrigger {
+        void execute(ProcessSession session) throws ProcessException;
+    }
+
+    @FunctionalInterface
+    public interface RollbackSession {
+        void rollback(ProcessSession session, Throwable t);
+    }
+
+    /**
+     * <p>This method is identical to what {@link org.apache.nifi.processor.AbstractProcessor#onTrigger(ProcessContext, ProcessSession)} does.</p>
+     * <p>Create a session from ProcessSessionFactory and execute specified onTrigger function, and commit the session if onTrigger finishes successfully.</p>
+     * <p>When an Exception is thrown during execution of the onTrigger, the session will be rollback. FlowFiles being processed will be penalized.</p>
+     */
+    public static void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory, ComponentLog logger, OnTrigger onTrigger) throws ProcessException {
+        onTrigger(context, sessionFactory, logger, onTrigger, (session, t) -> session.rollback(true));
+    }
+
+    public static void onTrigger(
+            ProcessContext context, ProcessSessionFactory sessionFactory, ComponentLog logger, OnTrigger onTrigger,
+            RollbackSession rollbackSession) throws ProcessException {
+        final ProcessSession session = sessionFactory.createSession();
+        try {
+            onTrigger.execute(session);
+            session.commit();
+        } catch (final Throwable t) {
+            logger.error("{} failed to process due to {}; rolling back session", new Object[]{onTrigger, t});
+            rollbackSession.rollback(session, t);
+            throw t;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
new file mode 100644
index 0000000..790f48a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract Put pattern class with a generic onTrigger method structure, composed with various partial functions.
+ * @param <FC> Class of context instance which is passed to each partial functions.
+ *            Lifetime of an function context should be limited for a single onTrigger method.
+ * @param <C> Class of connection to a data storage that this pattern puts data into.
+ */
+public class Put<FC, C extends AutoCloseable> {
+    protected PartialFunctions.InitConnection<FC, C> initConnection;
+    protected PartialFunctions.FetchFlowFiles<FC> fetchFlowFiles = PartialFunctions.fetchSingleFlowFile();
+    protected PutFlowFile<FC, C> putFlowFile;
+    protected PartialFunctions.TransferFlowFiles<FC> transferFlowFiles = PartialFunctions.transferRoutedFlowFiles();
+    protected PartialFunctions.AdjustRoute<FC> adjustRoute;
+    protected PartialFunctions.OnCompleted<FC, C> onCompleted;
+    protected PartialFunctions.OnFailed<FC, C> onFailed;
+    protected PartialFunctions.Cleanup<FC, C> cleanup;
+    protected ComponentLog logger;
+
+    /**
+     * Put fetched FlowFiles to a data storage.
+     * @param context process context passed from a Processor onTrigger.
+     * @param session process session passed from a Processor onTrigger.
+     * @param functionContext function context passed from a Processor onTrigger.
+     * @param connection connection to data storage, established by {@link PartialFunctions.InitConnection}.
+     * @param flowFiles FlowFiles fetched from {@link PartialFunctions.FetchFlowFiles}.
+     * @param result Route incoming FlowFiles if necessary.
+     */
+    protected void putFlowFiles(ProcessContext context, ProcessSession session,
+                                        FC functionContext, C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException {
+        for (FlowFile flowFile : flowFiles) {
+            putFlowFile.apply(context, session, functionContext, connection, flowFile, result);
+        }
+    }
+
+    protected void validateCompositePattern() {
+        Objects.requireNonNull(initConnection, "InitConnection function is required.");
+        Objects.requireNonNull(putFlowFile, "PutFlowFile function is required.");
+        Objects.requireNonNull(transferFlowFiles, "TransferFlowFiles function is required.");
+    }
+
+    /**
+     * <p>Processor using this pattern is expected to call this method from its onTrigger.
+     * <p>Typical usage would be constructing a process pattern instance at a processor method
+     * which is annotated with {@link org.apache.nifi.annotation.lifecycle.OnScheduled},
+     * and use pattern.onTrigger from processor.onTrigger.
+     * <p>{@link PartialFunctions.InitConnection} is required at least. In addition to any functions required by an implementation class.
+     * @param context process context passed from a Processor onTrigger.
+     * @param session process session passed from a Processor onTrigger.
+     * @param functionContext function context should be instantiated per onTrigger call.
+     * @throws ProcessException Each partial function can throw ProcessException if onTrigger should stop immediately.
+     */
+    public void onTrigger(ProcessContext context, ProcessSession session, FC functionContext) throws ProcessException {
+
+        validateCompositePattern();
+
+        final RoutingResult result = new RoutingResult();
+        final List<FlowFile> flowFiles = fetchFlowFiles.apply(context, session, functionContext, result);
+
+        // Transfer FlowFiles if there is any.
+        result.getRoutedFlowFiles().forEach(((relationship, routedFlowFiles) ->
+                session.transfer(routedFlowFiles, relationship)));
+
+        if (flowFiles == null || flowFiles.isEmpty()) {
+            logger.debug("No incoming FlowFiles.");
+            return;
+        }
+
+        try (C connection = initConnection.apply(context, session, functionContext)) {
+
+            try {
+                // Execute the core function.
+                try {
+                    putFlowFiles(context, session, functionContext, connection, flowFiles, result);
+                } catch (DiscontinuedException e) {
+                    // Whether it was an error or semi normal is depends on the implementation and reason why it wanted to discontinue.
+                    // So, no logging is needed here.
+                }
+
+                // Extension point to alter routes.
+                if (adjustRoute != null) {
+                    adjustRoute.apply(context, session, functionContext, result);
+                }
+
+                // Put fetched, but unprocessed FlowFiles back to self.
+                final List<FlowFile> transferredFlowFiles = result.getRoutedFlowFiles().values().stream()
+                        .flatMap(List::stream).collect(Collectors.toList());
+                final List<FlowFile> unprocessedFlowFiles = flowFiles.stream()
+                        .filter(flowFile -> !transferredFlowFiles.contains(flowFile)).collect(Collectors.toList());
+                result.routeTo(unprocessedFlowFiles, Relationship.SELF);
+
+                // OnCompleted processing.
+                if (onCompleted != null) {
+                    onCompleted.apply(context, session, functionContext, connection);
+                }
+
+                // Transfer FlowFiles.
+                transferFlowFiles.apply(context, session, functionContext, result);
+
+            } catch (Exception e) {
+                if (onFailed != null) {
+                    onFailed.apply(context, session, functionContext, connection, e);
+                }
+                throw e;
+            } finally {
+                if (cleanup != null) {
+                    cleanup.apply(context, session, functionContext, connection);
+                }
+            }
+
+        } catch (ProcessException e) {
+            throw e;
+        } catch (Exception e) {
+            // Throw uncaught exception as RuntimeException so that this processor will be yielded.
+            final String msg = String.format("Failed to execute due to %s", e);
+            logger.error(msg, e);
+            throw new RuntimeException(msg, e);
+        }
+
+    }
+
+    /**
+     * Specify an optional function that fetches incoming FlowFIles.
+     * If not specified, single FlowFile is fetched on each onTrigger.
+     * @param f Function to fetch incoming FlowFiles.
+     */
+    public void fetchFlowFiles(PartialFunctions.FetchFlowFiles<FC> f) {
+        fetchFlowFiles = f;
+    }
+
+    /**
+     * Specify a function that establishes a connection to target data storage.
+     * This function will be called when there is valid incoming FlowFiles.
+     * The created connection instance is automatically closed when onTrigger is finished.
+     * @param f Function to initiate a connection to a data storage.
+     */
+    public void initConnection(PartialFunctions.InitConnection<FC, C> f) {
+        initConnection = f;
+    }
+
+    /**
+     * Specify a function that puts an incoming FlowFile to target data storage.
+     * @param f a function to put a FlowFile to target storage.
+     */
+    public void putFlowFile(PutFlowFile<FC, C> f) {
+        this.putFlowFile = f;
+    }
+
+    /**
+     * Specify an optional function that adjust routed FlowFiles before transfer it.
+     * @param f a function to adjust route.
+     */
+    public void adjustRoute(PartialFunctions.AdjustRoute<FC> f) {
+        this.adjustRoute = f;
+    }
+
+    /**
+     * Specify an optional function responsible for transferring routed FlowFiles.
+     * If not specified routed FlowFiles are simply transferred to its destination by default.
+     * @param f a function to transfer routed FlowFiles.
+     */
+    public void transferFlowFiles(PartialFunctions.TransferFlowFiles<FC> f) {
+        this.transferFlowFiles = f;
+    }
+
+    /**
+     * Specify an optional function which will be called if input FlowFiles were successfully put to a target storage.
+     * @param f Function to be called when a put operation finishes successfully.
+     */
+    public void onCompleted(PartialFunctions.OnCompleted<FC, C> f) {
+        onCompleted = f;
+    }
+
+    /**
+     * Specify an optional function which will be called if input FlowFiles failed being put to a target storage.
+     * @param f Function to be called when a put operation failed.
+     */
+    public void onFailed(PartialFunctions.OnFailed<FC, C> f) {
+        onFailed = f;
+    }
+
+    /**
+     * Specify an optional function which will be called in a finally block.
+     * Typically useful when a special cleanup operation is needed for the connection.
+     * @param f Function to be called when a put operation finished regardless of whether it succeeded or not.
+     */
+    public void cleanup(PartialFunctions.Cleanup<FC, C> f) {
+        cleanup = f;
+    }
+
+    public void setLogger(ComponentLog logger) {
+        this.logger = logger;
+    }
+
+    @FunctionalInterface
+    public interface PutFlowFile<FC, C> {
+        void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection,
+                   FlowFile flowFile, RoutingResult result) throws ProcessException;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java
new file mode 100644
index 0000000..6e9da2e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Extended Put pattern capable of handling FlowFile groups.
+ * @param <FC> Function context class.
+ * @param <C> Connection class.
+ * @param <FFG> FlowFileGroup class.
+ */
+public class PutGroup<FC, C extends AutoCloseable, FFG extends PartialFunctions.FlowFileGroup> extends Put<FC, C> {
+
+
+    public PutGroup() {
+        // Just to make a composition valid.
+        this.putFlowFile = (context, session, functionContext, connection, inputFlowFile, result) -> {
+            throw new UnsupportedOperationException();
+        };
+    }
+
+    @FunctionalInterface
+    public interface PutFlowFiles<FC, C, FFG> {
+        void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection,
+                            FFG inputFlowFileGroup, RoutingResult result) throws ProcessException;
+    }
+
+    @Override
+    protected void validateCompositePattern() {
+        super.validateCompositePattern();
+        Objects.requireNonNull(groupFlowFiles, "GroupFlowFiles function is required.");
+    }
+
+    /**
+     * PutGroup does not support PutFileFile function for single FlowFile.
+     * Throws UnsupportedOperationException if called.
+     */
+    @Override
+    public void putFlowFile(PutFlowFile<FC, C> putFlowFile) {
+        throw new UnsupportedOperationException("PutFlowFile can not be used with PutGroup pattern. Specify PutFlowFiles instead.");
+    }
+
+    @FunctionalInterface
+    public interface GroupFlowFiles<FC, C, FFG> {
+        List<FFG> apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException;
+    }
+
+    private GroupFlowFiles<FC, C, FFG> groupFlowFiles;
+    private PutFlowFiles<FC, C, FFG> putFlowFiles;
+
+    /**
+     * Specify a function that groups input FlowFiles into FlowFile groups.
+     */
+    public void groupFetchedFlowFiles(GroupFlowFiles<FC, C, FFG> f) {
+        groupFlowFiles = f;
+    }
+
+    /**
+     * Specify a function that puts an input FlowFile group to a target storage using a given connection.
+     */
+    public void putFlowFiles(PutFlowFiles<FC, C, FFG> f) {
+        putFlowFiles = f;
+    }
+
+
+    @Override
+    protected void putFlowFiles(ProcessContext context, ProcessSession session, FC functionContext,
+                               C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException {
+        final List<FFG> flowFileGroups = groupFlowFiles
+                .apply(context, session, functionContext, connection, flowFiles, result);
+
+        for (FFG group : flowFileGroups) {
+            putFlowFiles.apply(context, session, functionContext, connection, group, result);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java
new file mode 100644
index 0000000..2d4d768
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.pattern.PartialFunctions.AdjustRoute;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * <p>RollbackOnFailure can be used as a function context for process patterns such as {@link Put} to provide a configurable error handling.
+ *
+ * <p>
+ *     RollbackOnFailure can add following characteristics to a processor:
+ *     <li>When disabled, input FlowFiles caused an error will be routed to 'failure' or 'retry' relationship, based on the type of error.</li>
+ *     <li>When enabled, input FlowFiles are kept in the input queue. A ProcessException is thrown to rollback the process session.</li>
+ *     <li>It assumes anything happened during a processors onTrigger can rollback, if this is marked as transactional.</li>
+ *     <li>If transactional and enabled, even if some FlowFiles are already processed, it rollbacks the session when error occurs.</li>
+ *     <li>If not transactional and enabled, it only rollbacks the session when error occurs only if there was no progress.</li>
+ * </p>
+ *
+ * <p>There are two approaches to apply RollbackOnFailure. One is using {@link ExceptionHandler#adjustError(BiFunction)},
+ * and the other is implementing processor onTrigger using process patterns such as {@link Put#adjustRoute(AdjustRoute)}. </p>
+ *
+ * <p>It's also possible to use both approaches. ExceptionHandler can apply when an Exception is thrown immediately, while AdjustRoute respond later but requires less code.</p>
+ */
+public class RollbackOnFailure {
+
+    private final boolean rollbackOnFailure;
+    private final boolean transactional;
+    private boolean discontinue;
+
+    private int processedCount = 0;
+
+    /**
+     * Constructor.
+     * @param rollbackOnFailure Should be set by user via processor configuration.
+     * @param transactional Specify whether a processor is transactional.
+     *                      If not, it is important to call {@link #proceed()} after successful execution of processors task,
+     *                      that indicates processor made an operation that can not be undone.
+     */
+    public RollbackOnFailure(boolean rollbackOnFailure, boolean transactional) {
+        this.rollbackOnFailure = rollbackOnFailure;
+        this.transactional = transactional;
+    }
+
+    public static final PropertyDescriptor ROLLBACK_ON_FAILURE = createRollbackOnFailureProperty("");
+
+    public static  PropertyDescriptor createRollbackOnFailureProperty(String additionalDescription) {
+        return new PropertyDescriptor.Builder()
+                .name("rollback-on-failure")
+                .displayName("Rollback On Failure")
+                .description("Specify how to handle error." +
+                        " By default (false), if an error occurs while processing a FlowFile, the FlowFile will be routed to" +
+                        " 'failure' or 'retry' relationship based on error type, and processor can continue with next FlowFile." +
+                        " Instead, you may want to rollback currently processed FlowFiles and stop further processing immediately." +
+                        " In that case, you can do so by enabling this 'Rollback On Failure' property. " +
+                        " If enabled, failed FlowFiles will stay in the input relationship without penalizing it and being processed repeatedly" +
+                        " until it gets processed successfully or removed by other means." +
+                        " It is important to set adequate 'Yield Duration' to avoid retrying too frequently." + additionalDescription)
+                .allowableValues("true", "false")
+                .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                .defaultValue("false")
+                .required(true)
+                .build();
+    }
+
+    /**
+     * Create a function to use with {@link ExceptionHandler} that adjust error type based on functional context.
+     */
+    public static <FCT extends RollbackOnFailure> BiFunction<FCT, ErrorTypes, ErrorTypes.Result> createAdjustError(final ComponentLog logger) {
+        return (c, t) -> {
+
+            ErrorTypes.Result adjusted = null;
+            switch (t.destination()) {
+
+                case ProcessException:
+                    // If this process can rollback, then rollback it.
+                    if (!c.canRollback()) {
+                        // If an exception is thrown but the processor is not transactional and processed count > 0, adjust it to self,
+                        // in order to stop any further processing until this input is processed successfully.
+                        // If we throw an Exception in this state, the already succeeded FlowFiles will be rolled back, too.
+                        // In case the progress was made by other preceding inputs,
+                        // those successful inputs should be sent to 'success' and this input stays in incoming queue.
+                        // In case this input made some progress to external system, the partial update will be replayed again,
+                        // can cause duplicated data.
+                        c.discontinue();
+                        // We should not penalize a FlowFile, if we did, other FlowFiles can be fetched first.
+                        // We need to block others to be processed until this one finishes.
+                        adjusted = new ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield);
+                    }
+                    break;
+
+                case Failure:
+                case Retry:
+                    if (c.isRollbackOnFailure()) {
+                        c.discontinue();
+                        if (c.canRollback()) {
+                            // If this process can rollback, then throw ProcessException instead, in order to rollback.
+                            adjusted = new ErrorTypes.Result(ErrorTypes.Destination.ProcessException, ErrorTypes.Penalty.Yield);
+                        } else {
+                            // If not,
+                            adjusted = new ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield);
+                        }
+                    }
+                    break;
+            }
+
+            if (adjusted != null) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Adjusted {} to {} based on context rollbackOnFailure={}, processedCount={}, transactional={}",
+                            new Object[]{t, adjusted, c.isRollbackOnFailure(), c.getProcessedCount(), c.isTransactional()});
+                }
+                return adjusted;
+            }
+
+            return t.result();
+        };
+    }
+
+    /**
+     * Create an {@link AdjustRoute} function to use with process pattern such as {@link Put} that adjust routed FlowFiles based on context.
+     * This function works as a safety net by covering cases that Processor implementation did not use ExceptionHandler and transfer FlowFiles
+     * without considering RollbackOnFailure context.
+     */
+    public static <FCT extends RollbackOnFailure> AdjustRoute<FCT> createAdjustRoute(Relationship ... failureRelationships) {
+        return (context, session, fc, result) -> {
+            if (fc.isRollbackOnFailure()) {
+                // Check if route contains failure relationship.
+                for (Relationship failureRelationship : failureRelationships) {
+                    if (!result.contains(failureRelationship)) {
+                        continue;
+                    }
+                    if (fc.canRollback()) {
+                        throw new ProcessException(String.format(
+                                "A FlowFile is routed to %s. Rollback session based on context rollbackOnFailure=%s, processedCount=%d, transactional=%s",
+                                failureRelationship.getName(), fc.isRollbackOnFailure(), fc.getProcessedCount(), fc.isTransactional()));
+                    } else {
+                        // Send failed FlowFiles to self.
+                        final Map<Relationship, List<FlowFile>> routedFlowFiles = result.getRoutedFlowFiles();
+                        final List<FlowFile> failedFlowFiles = routedFlowFiles.remove(failureRelationship);
+                        result.routeTo(failedFlowFiles, Relationship.SELF);
+                    }
+                }
+            }
+        };
+    }
+
+    public static <FCT extends RollbackOnFailure, I> ExceptionHandler.OnError<FCT, I> createOnError(ExceptionHandler.OnError<FCT, I> onError) {
+        return onError.andThen((context, input, result, e) -> {
+            if (context.shouldDiscontinue()) {
+                throw new DiscontinuedException("Discontinue processing due to " + e, e);
+            }
+        });
+    }
+
+    public static <FCT extends RollbackOnFailure> void onTrigger(
+            ProcessContext context, ProcessSessionFactory sessionFactory, FCT functionContext, ComponentLog logger,
+            PartialFunctions.OnTrigger onTrigger) throws ProcessException {
+
+        PartialFunctions.onTrigger(context, sessionFactory, logger, onTrigger, (session, t) -> {
+            // If RollbackOnFailure is enabled, do not penalize processing FlowFiles when rollback,
+            // in order to keep those in the incoming relationship to be processed again.
+            final boolean shouldPenalize = !functionContext.isRollbackOnFailure();
+            session.rollback(shouldPenalize);
+
+            // However, keeping failed FlowFile in the incoming relationship would retry it too often.
+            // So, administratively yield the process.
+            if (functionContext.isRollbackOnFailure()) {
+                logger.warn("Administratively yielding {} after rolling back due to {}", new Object[]{context.getName(), t}, t);
+                context.yield();
+            }
+        });
+    }
+
+    public int proceed() {
+        return ++processedCount;
+    }
+
+    public int getProcessedCount() {
+        return processedCount;
+    }
+
+    public boolean isRollbackOnFailure() {
+        return rollbackOnFailure;
+    }
+
+    public boolean isTransactional() {
+        return transactional;
+    }
+
+    public boolean canRollback() {
+        return transactional || processedCount == 0;
+    }
+
+    public boolean shouldDiscontinue() {
+        return discontinue;
+    }
+
+    public void discontinue() {
+        this.discontinue = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java
new file mode 100644
index 0000000..200d893
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RoutingResult {
+
+    private final Map<Relationship, List<FlowFile>> routedFlowFiles = new HashMap<>();
+
+    public void routeTo(final FlowFile flowFile, final Relationship relationship) {
+        routedFlowFiles.computeIfAbsent(relationship, r -> new ArrayList<>()).add(flowFile);
+    }
+
+    public void routeTo(final List<FlowFile> flowFiles, final Relationship relationship) {
+        routedFlowFiles.computeIfAbsent(relationship, r -> new ArrayList<>()).addAll(flowFiles);
+    }
+
+    public void merge(final RoutingResult r) {
+        r.getRoutedFlowFiles().forEach((relationship, routedFlowFiles) -> routeTo(routedFlowFiles, relationship));
+    }
+
+    public Map<Relationship, List<FlowFile>> getRoutedFlowFiles() {
+        return routedFlowFiles;
+    }
+
+    public boolean contains(Relationship relationship) {
+        return routedFlowFiles.containsKey(relationship) && !routedFlowFiles.get(relationship).isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
new file mode 100644
index 0000000..65b11ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.put;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.put.sender.ChannelSender;
+import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
+import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
+import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A base class for processors that send data to an external system using TCP or UDP.
+ */
+public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryProcessor {
+
+    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the destination.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .build();
+    public static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The port on the destination.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
+            .name("Max Size of Socket Send Buffer")
+            .description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " +
+                    "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
+                    "the data can be read, and incoming data will be dropped.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
+            .Builder().name("Idle Connection Expiration")
+            .description("The amount of time a connection should be held open without being used before closing the connection.")
+            .required(true)
+            .defaultValue("5 seconds")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    // Putting these properties here so sub-classes don't have to redefine them, but they are
+    // not added to the properties by default since not all processors may need them
+
+    public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
+    public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
+
+    public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
+            .Builder().name("Protocol")
+            .description("The protocol for communication.")
+            .required(true)
+            .allowableValues(TCP_VALUE, UDP_VALUE)
+            .defaultValue(TCP_VALUE.getValue())
+            .build();
+    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
+            .name("Message Delimiter")
+            .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
+                    + "If not specified, the entire content of the FlowFile will be used as a single message. "
+                    + "If specified, the contents of the FlowFile will be split on this delimiter and each section "
+                    + "sent as a separate message. Note that if messages are delimited and some messages for a given FlowFile "
+                    + "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those "
+                    + "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' "
+                    + "relationship.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies the character set of the data being sent.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Timeout")
+            .description("The timeout for connecting to and communicating with the destination. Does not apply to UDP")
+            .required(false)
+            .defaultValue("10 seconds")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
+            .name("Outgoing Message Delimiter")
+            .description("Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message "
+                    + "that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should "
+                    + "ensure that the FlowFile content does not contain the delimiter character to avoid errors. In order to use a new line character you can "
+                    + "enter '\\n'. For a tab character use '\\t'. Finally for a carriage return use '\\r'.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("\\n")
+            .expressionLanguageSupported(true)
+            .build();
+    public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder()
+            .name("Connection Per FlowFile")
+            .description("Specifies whether to send each FlowFile's content on an individual connection.")
+            .required(true)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
+                    "messages will be sent over a secure connection.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to the destination are sent out this relationship.")
+            .build();
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+
+    protected volatile String transitUri;
+    protected volatile BlockingQueue<ChannelSender> senderPool;
+
+    protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>();
+    protected final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(HOSTNAME);
+        descriptors.add(PORT);
+        descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
+        descriptors.add(IDLE_EXPIRATION);
+        descriptors.addAll(getAdditionalProperties());
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.addAll(getAdditionalRelationships());
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    /**
+     * Override to provide additional relationships for the processor.
+     *
+     * @return a list of relationships
+     */
+    protected List<Relationship> getAdditionalRelationships() {
+        return Collections.EMPTY_LIST;
+    }
+
+    /**
+     * Override to provide additional properties for the processor.
+     *
+     * @return a list of properties
+     */
+    protected List<PropertyDescriptor> getAdditionalProperties() {
+        return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public final Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws IOException {
+        // initialize the queue of senders, one per task, senders will get created on the fly in onTrigger
+        this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+        this.transitUri = createTransitUri(context);
+    }
+
+    @OnStopped
+    public void closeSenders() {
+        if (senderPool != null) {
+            ChannelSender sender = senderPool.poll();
+            while (sender != null) {
+                sender.close();
+                sender = senderPool.poll();
+            }
+        }
+    }
+
+    /**
+     * Sub-classes construct a transit uri for provenance events. Called from @OnScheduled
+     * method of this class.
+     *
+     * @param context the current context
+     *
+     * @return the transit uri
+     */
+    protected abstract String createTransitUri(final ProcessContext context);
+
+    /**
+     * Sub-classes create a ChannelSender given a context.
+     *
+     * @param context the current context
+     * @return an implementation of ChannelSender
+     * @throws IOException if an error occurs creating the ChannelSender
+     */
+    protected abstract ChannelSender createSender(final ProcessContext context) throws IOException;
+
+    /**
+     * Close any senders that haven't been active with in the given threshold
+     *
+     * @param idleThreshold the threshold to consider a sender as idle
+     */
+    protected void pruneIdleSenders(final long idleThreshold) {
+        long currentTime = System.currentTimeMillis();
+        final List<ChannelSender> putBack = new ArrayList<>();
+
+        // if a connection hasn't been used with in the threshold then it gets closed
+        ChannelSender sender;
+        while ((sender = senderPool.poll()) != null) {
+            if (currentTime > (sender.getLastUsed() + idleThreshold)) {
+                getLogger().debug("Closing idle connection...");
+                sender.close();
+            } else {
+                putBack.add(sender);
+            }
+        }
+        // re-queue senders that weren't idle, but if the queue is full then close the sender
+        for (ChannelSender putBackSender : putBack) {
+            boolean returned = senderPool.offer(putBackSender);
+            if (!returned) {
+                putBackSender.close();
+            }
+        }
+    }
+
+    /**
+     * Helper for sub-classes to create a sender.
+     *
+     * @param protocol the protocol for the sender
+     * @param host the host to send to
+     * @param port the port to send to
+     * @param timeout the timeout for connecting and communicating over the channel
+     * @param maxSendBufferSize the maximum size of the socket send buffer
+     * @param sslContext an SSLContext, or null if not using SSL
+     *
+     * @return a ChannelSender based on the given properties
+     *
+     * @throws IOException if an error occurs creating the sender
+     */
+    protected ChannelSender createSender(final String protocol,
+                                         final String host,
+                                         final int port,
+                                         final int timeout,
+                                         final int maxSendBufferSize,
+                                         final SSLContext sslContext) throws IOException {
+
+        ChannelSender sender;
+        if (protocol.equals(UDP_VALUE.getValue())) {
+            sender = new DatagramChannelSender(host, port, maxSendBufferSize, getLogger());
+        } else {
+            // if an SSLContextService is provided then we make a secure sender
+            if (sslContext != null) {
+                sender = new SSLSocketChannelSender(host, port, maxSendBufferSize, sslContext, getLogger());
+            } else {
+                sender = new SocketChannelSender(host, port, maxSendBufferSize, getLogger());
+            }
+        }
+
+        sender.setTimeout(timeout);
+        sender.open();
+        return sender;
+    }
+
+    /**
+     * Helper method to acquire an available ChannelSender from the pool. If the pool is empty then the a new sender is created.
+     *
+     * @param context
+     *            - the current process context.
+     *
+     * @param session
+     *            - the current process session.
+     * @param flowFile
+     *            - the FlowFile being processed in this session.
+     *
+     * @return ChannelSender - the sender that has been acquired or null if no sender is available and a new sender cannot be created.
+     */
+    protected ChannelSender acquireSender(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+        ChannelSender sender = senderPool.poll();
+        if (sender == null) {
+            try {
+                getLogger().debug("No available connections, creating a new one...");
+                sender = createSender(context);
+            } catch (IOException e) {
+                getLogger().error("No available connections, and unable to create a new one, transferring {} to failure",
+                        new Object[]{flowFile}, e);
+                session.transfer(flowFile, REL_FAILURE);
+                session.commit();
+                context.yield();
+                sender = null;
+            }
+        }
+
+        return sender;
+    }
+
+
+    /**
+     * Helper method to relinquish the ChannelSender back to the pool. If the sender is disconnected or the pool is full
+     * then the sender is closed and discarded.
+     *
+     * @param sender the sender to return or close
+     */
+    protected void relinquishSender(final ChannelSender sender) {
+        if (sender != null) {
+            // if the connection is still open then then try to return the sender to the pool.
+            if (sender.isConnected()) {
+                boolean returned = senderPool.offer(sender);
+                // if the pool is full then close the sender.
+                if (!returned) {
+                    sender.close();
+                }
+            } else {
+                // probably already closed here, but quietly close anyway to be safe.
+                sender.close();
+            }
+        }
+    }
+
+    /**
+     * Represents a range of messages from a FlowFile.
+     */
+    protected static class Range {
+        private final long start;
+        private final long end;
+
+        public Range(final long start, final long end) {
+            this.start = start;
+            this.end = end;
+        }
+
+        public long getStart() {
+            return start;
+        }
+
+        public long getEnd() {
+            return end;
+        }
+
+        @Override
+        public String toString() {
+            return "Range[" + start + "-" + end + "]";
+        }
+    }
+
+    /**
+     * A wrapper to hold the ranges of a FlowFile that were successful and ranges that failed, and then
+     * transfer those ranges appropriately.
+     */
+    protected class FlowFileMessageBatch {
+
+        private final ProcessSession session;
+        private final FlowFile flowFile;
+        private final long startTime = System.nanoTime();
+
+        private final List<Range> successfulRanges = new ArrayList<>();
+        private final List<Range> failedRanges = new ArrayList<>();
+
+        private Exception lastFailureReason;
+        private long numMessages = -1L;
+        private long completeTime = 0L;
+        private boolean canceled = false;
+
+        public FlowFileMessageBatch(final ProcessSession session, final FlowFile flowFile) {
+            this.session = session;
+            this.flowFile = flowFile;
+        }
+
+        public synchronized void cancelOrComplete() {
+            if (isComplete()) {
+                completeSession();
+                return;
+            }
+
+            this.canceled = true;
+
+            session.rollback();
+            successfulRanges.clear();
+            failedRanges.clear();
+        }
+
+        public synchronized void addSuccessfulRange(final long start, final long end) {
+            if (canceled) {
+                return;
+            }
+
+            successfulRanges.add(new Range(start, end));
+
+            if (isComplete()) {
+                activeBatches.remove(this);
+                completeBatches.add(this);
+                completeTime = System.nanoTime();
+            }
+        }
+
+        public synchronized void addFailedRange(final long start, final long end, final Exception e) {
+            if (canceled) {
+                return;
+            }
+
+            failedRanges.add(new Range(start, end));
+            lastFailureReason = e;
+
+            if (isComplete()) {
+                activeBatches.remove(this);
+                completeBatches.add(this);
+                completeTime = System.nanoTime();
+            }
+        }
+
+        private boolean isComplete() {
+            return !canceled && (numMessages > -1) && (successfulRanges.size() + failedRanges.size() >= numMessages);
+        }
+
+        public synchronized void setNumMessages(final long msgCount) {
+            this.numMessages = msgCount;
+
+            if (isComplete()) {
+                activeBatches.remove(this);
+                completeBatches.add(this);
+                completeTime = System.nanoTime();
+            }
+        }
+
+        private void transferRanges(final List<Range> ranges, final Relationship relationship) {
+            Collections.sort(ranges, new Comparator<Range>() {
+                @Override
+                public int compare(final Range o1, final Range o2) {
+                    return Long.compare(o1.getStart(), o2.getStart());
+                }
+            });
+
+            for (int i = 0; i < ranges.size(); i++) {
+                Range range = ranges.get(i);
+                int count = 1;
+
+                while (i + 1 < ranges.size()) {
+                    // Check if the next range in the List continues where this one left off.
+                    final Range nextRange = ranges.get(i + 1);
+
+                    if (nextRange.getStart() == range.getEnd()) {
+                        // We have two ranges in a row that are contiguous; combine them into a single Range.
+                        range = new Range(range.getStart(), nextRange.getEnd());
+
+                        count++;
+                        i++;
+                    } else {
+                        break;
+                    }
+                }
+
+                // Create a FlowFile for this range.
+                FlowFile child = session.clone(flowFile, range.getStart(), range.getEnd() - range.getStart());
+                if (relationship == REL_SUCCESS) {
+                    session.getProvenanceReporter().send(child, transitUri, "Sent " + count + " messages");
+                    session.transfer(child, relationship);
+                } else {
+                    child = session.penalize(child);
+                    session.transfer(child, relationship);
+                }
+            }
+        }
+
+        public synchronized void completeSession() {
+            if (canceled) {
+                return;
+            }
+
+            if (successfulRanges.isEmpty() && failedRanges.isEmpty()) {
+                getLogger().info("Completed processing {} but sent 0 FlowFiles", new Object[] {flowFile});
+                session.transfer(flowFile, REL_SUCCESS);
+                session.commit();
+                return;
+            }
+
+            if (successfulRanges.isEmpty()) {
+                getLogger().error("Failed to send {}; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason});
+                final FlowFile penalizedFlowFile = session.penalize(flowFile);
+                session.transfer(penalizedFlowFile, REL_FAILURE);
+                session.commit();
+                return;
+            }
+
+            if (failedRanges.isEmpty()) {
+                final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime);
+                session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + successfulRanges.size() + " messages;", transferMillis);
+                session.transfer(flowFile, REL_SUCCESS);
+                getLogger().info("Successfully sent {} messages for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis});
+                session.commit();
+                return;
+            }
+
+            // At this point, the successful ranges is not empty and the failed ranges is not empty. This indicates that some messages made their way
+            // successfully and some failed. We will address this by splitting apart the source FlowFile into children and sending the successful messages to 'success'
+            // and the failed messages to 'failure'.
+            transferRanges(successfulRanges, REL_SUCCESS);
+            transferRanges(failedRanges, REL_FAILURE);
+            session.remove(flowFile);
+            getLogger().error("Successfully sent {} messages, but failed to send {} messages; the last error received was {}",
+                    new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason});
+            session.commit();
+        }
+    }
+
+    /**
+     * Gets the current value of the "Outgoing Message Delimiter" property and parses the special characters.
+     *
+     * @param context
+     *            - the current process context.
+     * @param flowFile
+     *            - the FlowFile being processed.
+     *
+     * @return String containing the Delimiter value.
+     */
+    protected String getOutgoingMessageDelimiter(final ProcessContext context, final FlowFile flowFile) {
+        String delimiter = context.getProperty(OUTGOING_MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
+        if (delimiter != null) {
+            delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+        }
+        return delimiter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
new file mode 100644
index 0000000..278a9ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.put.sender;
+
+import org.apache.nifi.logging.ComponentLog;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+/**
+ * Base class for sending messages over a channel.
+ */
+public abstract class ChannelSender {
+
+    protected final int port;
+    protected final String host;
+    protected final int maxSendBufferSize;
+    protected final ComponentLog logger;
+
+    protected volatile int timeout = 10000;
+    protected volatile long lastUsed;
+
+    public ChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) {
+        this.port = port;
+        this.host = host;
+        this.maxSendBufferSize = maxSendBufferSize;
+        this.logger = logger;
+    }
+
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * @return the last time data was sent over this channel
+     */
+    public long getLastUsed() {
+        return lastUsed;
+    }
+
+    /**
+     * Opens the connection to the destination.
+     *
+     * @throws IOException if an error occurred opening the connection.
+     */
+    public abstract void open() throws IOException;
+
+    /**
+     * Sends the given string over the channel.
+     *
+     * @param message the message to send over the channel
+     * @throws IOException if there was an error communicating over the channel
+     */
+    public void send(final String message, final Charset charset) throws IOException {
+        final byte[] bytes = message.getBytes(charset);
+        send(bytes);
+    }
+
+    /**
+     * Sends the given data over the channel.
+     *
+     * @param data the data to send over the channel
+     * @throws IOException if there was an error communicating over the channel
+     */
+    public void send(final byte[] data) throws IOException {
+        try {
+            write(data);
+            lastUsed = System.currentTimeMillis();
+        } catch (IOException e) {
+            // failed to send data over the channel, we close it to force
+            // the creation of a new one next time
+            close();
+            throw e;
+        }
+    }
+
+    /**
+     * Write the given buffer to the underlying channel.
+     */
+    protected abstract void write(byte[] data) throws IOException;
+
+    /**
+     * @return true if the underlying channel is connected
+     */
+    public abstract boolean isConnected();
+
+    /**
+     * Close the underlying channel
+     */
+    public abstract void close();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
new file mode 100644
index 0000000..0b2dfb8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.put.sender;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ComponentLog;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+/**
+ * Sends messages over a DatagramChannel.
+ */
+public class DatagramChannelSender extends ChannelSender {
+
+    private DatagramChannel channel;
+
+    public DatagramChannelSender(final String host, final int port, final int maxSendBufferSize, final ComponentLog logger) {
+        super(host, port, maxSendBufferSize, logger);
+    }
+
+    @Override
+    public void open() throws IOException {
+        if (channel == null) {
+            channel = DatagramChannel.open();
+
+            if (maxSendBufferSize > 0) {
+                channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize);
+                final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF);
+                if (actualSendBufSize < maxSendBufferSize) {
+                    logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize
+                            + " bytes but could only set to " + actualSendBufSize + "bytes. You may want to "
+                            + "consider changing the Operating System's maximum receive buffer");
+                }
+            }
+        }
+
+        if (!channel.isConnected()) {
+            channel.connect(new InetSocketAddress(InetAddress.getByName(host), port));
+        }
+    }
+
+    @Override
+    protected void write(byte[] data) throws IOException {
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        while (buffer.hasRemaining()) {
+            channel.write(buffer);
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return channel != null && channel.isConnected();
+    }
+
+    @Override
+    public void close() {
+        IOUtils.closeQuietly(channel);
+        channel = null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
new file mode 100644
index 0000000..a70c9c5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.put.sender;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+
+/**
+ * Sends messages over an SSLSocketChannel.
+ */
+public class SSLSocketChannelSender extends SocketChannelSender {
+
+    private SSLSocketChannel sslChannel;
+    private SSLContext sslContext;
+
+    public SSLSocketChannelSender(final String host,
+                                  final int port,
+                                  final int maxSendBufferSize,
+                                  final SSLContext sslContext,
+                                  final ComponentLog logger) {
+        super(host, port, maxSendBufferSize, logger);
+        this.sslContext = sslContext;
+    }
+
+    @Override
+    public void open() throws IOException {
+        if (sslChannel == null) {
+            super.open();
+            sslChannel = new SSLSocketChannel(sslContext, channel, true);
+        }
+        sslChannel.setTimeout(timeout);
+
+        // SSLSocketChannel will check if already connected so we can safely call this
+        sslChannel.connect();
+    }
+
+    @Override
+    protected void write(byte[] data) throws IOException {
+        sslChannel.write(data);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return sslChannel != null && !sslChannel.isClosed();
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        IOUtils.closeQuietly(sslChannel);
+        sslChannel = null;
+    }
+}