You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2017/07/19 10:25:50 UTC
nifi git commit: NIFI-4157 Improvements to PutTCP
Repository: nifi
Updated Branches:
refs/heads/master 59754500d -> f87d2a2f5
NIFI-4157 Improvements to PutTCP
Signed-off-by: Bryan Bende <bb...@apache.org>
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #1989.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f87d2a2f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f87d2a2f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f87d2a2f
Branch: refs/heads/master
Commit: f87d2a2f572692a7e0a4078822539f424a0438e7
Parents: 5975450
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Jul 6 15:40:50 2017 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Wed Jul 19 12:25:42 2017 +0200
----------------------------------------------------------------------
.../util/put/AbstractPutEventProcessor.java | 3 +-
.../util/put/sender/SSLSocketChannelSender.java | 13 ++-
.../util/put/sender/SocketChannelSender.java | 6 ++
.../apache/nifi/processors/standard/PutTCP.java | 97 ++++++++------------
4 files changed, 58 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/f87d2a2f/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 65b11ff..5833819 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -128,9 +128,8 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr
+ "that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should "
+ "ensure that the FlowFile content does not contain the delimiter character to avoid errors. In order to use a new line character you can "
+ "enter '\\n'. For a tab character use '\\t'. Finally for a carriage return use '\\r'.")
- .required(true)
+ .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .defaultValue("\\n")
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder()
http://git-wip-us.apache.org/repos/asf/nifi/blob/f87d2a2f/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
index a70c9c5..70771f1 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
@@ -19,17 +19,20 @@ package org.apache.nifi.processor.util.put.sender;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
import javax.net.ssl.SSLContext;
import java.io.IOException;
+import java.io.OutputStream;
/**
* Sends messages over an SSLSocketChannel.
*/
public class SSLSocketChannelSender extends SocketChannelSender {
- private SSLSocketChannel sslChannel;
private SSLContext sslContext;
+ private SSLSocketChannel sslChannel;
+ private SSLSocketChannelOutputStream sslOutputStream;
public SSLSocketChannelSender(final String host,
final int port,
@@ -50,6 +53,7 @@ public class SSLSocketChannelSender extends SocketChannelSender {
// SSLSocketChannel will check if already connected so we can safely call this
sslChannel.connect();
+ sslOutputStream = new SSLSocketChannelOutputStream(sslChannel);
}
@Override
@@ -65,7 +69,14 @@ public class SSLSocketChannelSender extends SocketChannelSender {
@Override
public void close() {
super.close();
+ IOUtils.closeQuietly(sslOutputStream);
IOUtils.closeQuietly(sslChannel);
sslChannel = null;
}
+
+ @Override
+ public OutputStream getOutputStream() {
+ return sslOutputStream;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f87d2a2f/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
index 8d4f875..6f7796b 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
@@ -21,6 +21,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
@@ -95,4 +96,9 @@ public class SocketChannelSender extends ChannelSender {
socketChannelOutput = null;
channel = null;
}
+
+ public OutputStream getOutputStream() {
+ return socketChannelOutput;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f87d2a2f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
index 34f6277..75165d7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
@@ -16,14 +16,7 @@
*/
package org.apache.nifi.processors.standard;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
@@ -37,14 +30,21 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
import org.apache.nifi.processor.util.put.sender.ChannelSender;
+import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import javax.net.ssl.SSLContext;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
* <p>
@@ -178,15 +178,34 @@ public class PutTCP extends AbstractPutEventProcessor {
return;
}
+ // really shouldn't happen since we know the protocol is TCP here, but this is more graceful so we
+ // can cast to a SocketChannelSender later in order to obtain the OutputStream
+ if (!(sender instanceof SocketChannelSender)) {
+ getLogger().error("Processor can only be used with a SocketChannelSender, but obtained: " + sender.getClass().getCanonicalName());
+ context.yield();
+ return;
+ }
+
+ boolean closeSender = isConnectionPerFlowFile(context);
try {
- String outgoingMessageDelimiter = getOutgoingMessageDelimiter(context, flowFile);
- ByteArrayOutputStream content = readContent(session, flowFile);
- if (outgoingMessageDelimiter != null) {
- Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
- content = appendDelimiter(content, outgoingMessageDelimiter, charset);
+ // We might keep the connection open across invocations of the processor so don't auto-close this
+ final OutputStream out = ((SocketChannelSender)sender).getOutputStream();
+ final String delimiter = getOutgoingMessageDelimiter(context, flowFile);
+
+ final StopWatch stopWatch = new StopWatch(true);
+ try (final InputStream rawIn = session.read(flowFile);
+ final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+ IOUtils.copy(in, out);
+ if (delimiter != null) {
+ final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
+ out.write(delimiter.getBytes(charSet), 0, delimiter.length());
+ }
+ out.flush();
+ } catch (final Exception e) {
+ closeSender = true;
+ throw e;
}
- StopWatch stopWatch = new StopWatch(true);
- sender.send(content.toByteArray());
+
session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
session.commit();
@@ -194,11 +213,10 @@ public class PutTCP extends AbstractPutEventProcessor {
onFailure(context, session, flowFile);
getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[] { flowFile }, e);
} finally {
- // If we are going to use this sender again, then relinquish it back to the pool.
- if (!isConnectionPerFlowFile(context)) {
- relinquishSender(sender);
- } else {
+ if (closeSender) {
sender.close();
+ } else {
+ relinquishSender(sender);
}
}
}
@@ -221,43 +239,6 @@ public class PutTCP extends AbstractPutEventProcessor {
}
/**
- * Helper method to read the FlowFile content stream into a ByteArrayOutputStream object.
- *
- * @param session
- * - the current process session.
- * @param flowFile
- * - the FlowFile to read the content from.
- *
- * @return ByteArrayOutputStream object containing the FlowFile content.
- */
- protected ByteArrayOutputStream readContent(final ProcessSession session, final FlowFile flowFile) {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) flowFile.getSize() + 1);
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- StreamUtils.copy(in, baos);
- }
- });
-
- return baos;
- }
-
- /**
- * Helper method to append a delimiter to the message contents.
- *
- * @param content
- * - the message contents.
- * @param delimiter
- * - the delimiter value.
- *
- * @return ByteArrayOutputStream object containing the new message contents.
- */
- protected ByteArrayOutputStream appendDelimiter(final ByteArrayOutputStream content, final String delimiter, Charset charSet) {
- content.write(delimiter.getBytes(charSet), 0, delimiter.length());
- return content;
- }
-
- /**
* Gets the current value of the "Connection Per FlowFile" property.
*
* @param context