You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2017/07/19 10:25:50 UTC

nifi git commit: NIFI-4157 Improvements to PutTCP

Repository: nifi
Updated Branches:
  refs/heads/master 59754500d -> f87d2a2f5


NIFI-4157 Improvements to PutTCP

Signed-off-by: Bryan Bende <bb...@apache.org>
Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #1989.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f87d2a2f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f87d2a2f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f87d2a2f

Branch: refs/heads/master
Commit: f87d2a2f572692a7e0a4078822539f424a0438e7
Parents: 5975450
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Jul 6 15:40:50 2017 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Wed Jul 19 12:25:42 2017 +0200

----------------------------------------------------------------------
 .../util/put/AbstractPutEventProcessor.java     |  3 +-
 .../util/put/sender/SSLSocketChannelSender.java | 13 ++-
 .../util/put/sender/SocketChannelSender.java    |  6 ++
 .../apache/nifi/processors/standard/PutTCP.java | 97 ++++++++------------
 4 files changed, 58 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f87d2a2f/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 65b11ff..5833819 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -128,9 +128,8 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr
                     + "that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should "
                     + "ensure that the FlowFile content does not contain the delimiter character to avoid errors. In order to use a new line character you can "
                     + "enter '\\n'. For a tab character use '\\t'. Finally for a carriage return use '\\r'.")
-            .required(true)
+            .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue("\\n")
             .expressionLanguageSupported(true)
             .build();
     public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/f87d2a2f/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
index a70c9c5..70771f1 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
@@ -19,17 +19,20 @@ package org.apache.nifi.processor.util.put.sender;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
 
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
+import java.io.OutputStream;
 
 /**
  * Sends messages over an SSLSocketChannel.
  */
 public class SSLSocketChannelSender extends SocketChannelSender {
 
-    private SSLSocketChannel sslChannel;
     private SSLContext sslContext;
+    private SSLSocketChannel sslChannel;
+    private SSLSocketChannelOutputStream sslOutputStream;
 
     public SSLSocketChannelSender(final String host,
                                   final int port,
@@ -50,6 +53,7 @@ public class SSLSocketChannelSender extends SocketChannelSender {
 
         // SSLSocketChannel will check if already connected so we can safely call this
         sslChannel.connect();
+        sslOutputStream = new SSLSocketChannelOutputStream(sslChannel);
     }
 
     @Override
@@ -65,7 +69,14 @@ public class SSLSocketChannelSender extends SocketChannelSender {
     @Override
     public void close() {
         super.close();
+        IOUtils.closeQuietly(sslOutputStream);
         IOUtils.closeQuietly(sslChannel);
         sslChannel = null;
     }
+
+    @Override
+    public OutputStream getOutputStream() {
+        return sslOutputStream;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f87d2a2f/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
index 8d4f875..6f7796b 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
@@ -21,6 +21,7 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
@@ -95,4 +96,9 @@ public class SocketChannelSender extends ChannelSender {
         socketChannelOutput = null;
         channel = null;
     }
+
+    public OutputStream getOutputStream() {
+        return socketChannelOutput;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f87d2a2f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
index 34f6277..75165d7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
@@ -16,14 +16,7 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
@@ -37,14 +30,21 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
 import org.apache.nifi.processor.util.put.sender.ChannelSender;
+import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
 import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 
 import javax.net.ssl.SSLContext;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  * <p>
@@ -178,15 +178,34 @@ public class PutTCP extends AbstractPutEventProcessor {
             return;
         }
 
+        // really shouldn't happen since we know the protocol is TCP here, but this is more graceful so we
+        // can cast to a SocketChannelSender later in order to obtain the OutputStream
+        if (!(sender instanceof SocketChannelSender)) {
+            getLogger().error("Processor can only be used with a SocketChannelSender, but obtained: " + sender.getClass().getCanonicalName());
+            context.yield();
+            return;
+        }
+
+        boolean closeSender = isConnectionPerFlowFile(context);
         try {
-            String outgoingMessageDelimiter = getOutgoingMessageDelimiter(context, flowFile);
-            ByteArrayOutputStream content = readContent(session, flowFile);
-            if (outgoingMessageDelimiter != null) {
-                Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
-                content = appendDelimiter(content, outgoingMessageDelimiter, charset);
+            // We might keep the connection open across invocations of the processor so don't auto-close this
+            final OutputStream out = ((SocketChannelSender)sender).getOutputStream();
+            final String delimiter = getOutgoingMessageDelimiter(context, flowFile);
+
+            final StopWatch stopWatch = new StopWatch(true);
+            try (final InputStream rawIn = session.read(flowFile);
+                 final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+                IOUtils.copy(in, out);
+                if (delimiter != null) {
+                    final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
+                    out.write(delimiter.getBytes(charSet), 0, delimiter.length());
+                }
+                out.flush();
+            } catch (final Exception e) {
+                closeSender = true;
+                throw e;
             }
-            StopWatch stopWatch = new StopWatch(true);
-            sender.send(content.toByteArray());
+
             session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowFile, REL_SUCCESS);
             session.commit();
@@ -194,11 +213,10 @@ public class PutTCP extends AbstractPutEventProcessor {
             onFailure(context, session, flowFile);
             getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[] { flowFile }, e);
         } finally {
-            // If we are going to use this sender again, then relinquish it back to the pool.
-            if (!isConnectionPerFlowFile(context)) {
-                relinquishSender(sender);
-            } else {
+            if (closeSender) {
                 sender.close();
+            } else {
+                relinquishSender(sender);
             }
         }
     }
@@ -221,43 +239,6 @@ public class PutTCP extends AbstractPutEventProcessor {
     }
 
     /**
-     * Helper method to read the FlowFile content stream into a ByteArrayOutputStream object.
-     *
-     * @param session
-     *            - the current process session.
-     * @param flowFile
-     *            - the FlowFile to read the content from.
-     *
-     * @return ByteArrayOutputStream object containing the FlowFile content.
-     */
-    protected ByteArrayOutputStream readContent(final ProcessSession session, final FlowFile flowFile) {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) flowFile.getSize() + 1);
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream in) throws IOException {
-                StreamUtils.copy(in, baos);
-            }
-        });
-
-        return baos;
-    }
-
-    /**
-     * Helper method to append a delimiter to the message contents.
-     *
-     * @param content
-     *            - the message contents.
-     * @param delimiter
-     *            - the delimiter value.
-     *
-     * @return ByteArrayOutputStream object containing the new message contents.
-     */
-    protected ByteArrayOutputStream appendDelimiter(final ByteArrayOutputStream content, final String delimiter, Charset charSet) {
-        content.write(delimiter.getBytes(charSet), 0, delimiter.length());
-        return content;
-    }
-
-    /**
      * Gets the current value of the "Connection Per FlowFile" property.
      *
      * @param context