You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/01/18 20:31:54 UTC
[nifi] branch main updated: NIFI-9571 Corrected Session commit handling in PutTCP
This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 405934d NIFI-9571 Corrected Session commit handling in PutTCP
405934d is described below
commit 405934dcd216db24a4d1009dc8e8da5f76bf165d
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Jan 13 13:15:50 2022 -0600
NIFI-9571 Corrected Session commit handling in PutTCP
- Added generic type to AbstractPutEventProcessor for compiler checking of event types
- Refactored createTransitUri to shared method in AbstractPutEventProcessor
Signed-off-by: Joe Gresock <jg...@gmail.com>
This closes #5658.
---
.../util/put/AbstractPutEventProcessor.java | 29 ++++++--------
.../apache/nifi/processors/splunk/PutSplunk.java | 39 +++++++++----------
.../apache/nifi/processors/standard/PutTCP.java | 44 ++++++++--------------
.../apache/nifi/processors/standard/PutUDP.java | 43 ++++++++-------------
4 files changed, 63 insertions(+), 92 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 659e3df..567f8fd 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -22,8 +22,6 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.event.transport.EventSender;
-import org.apache.nifi.event.transport.configuration.TransportProtocol;
-import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@@ -52,7 +50,7 @@ import java.util.concurrent.TimeUnit;
/**
* A base class for processors that send data to an external system using TCP or UDP.
*/
-public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryProcessor {
+public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactoryProcessor {
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
@@ -164,7 +162,7 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr
private List<PropertyDescriptor> descriptors;
protected volatile String transitUri;
- protected EventSender eventSender;
+ protected EventSender<T> eventSender;
protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>();
protected final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<>());
@@ -229,23 +227,20 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr
}
}
- /**
- * Sub-classes construct a transit uri for provenance events. Called from @OnScheduled
- * method of this class.
- *
- * @param context the current context
- *
- * @return the transit uri
- */
- protected abstract String createTransitUri(final ProcessContext context);
+ protected String createTransitUri(ProcessContext context) {
+ final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+ final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+ final String protocol = getProtocol(context);
+ return String.format("%s://%s:%s", protocol, host, port);
+ }
- protected EventSender<?> getEventSender(final ProcessContext context) {
+ protected EventSender<T> getEventSender(final ProcessContext context) {
final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
final String protocol = getProtocol(context);
final boolean singleEventPerConnection = context.getProperty(CONNECTION_PER_FLOWFILE).getValue() != null ? context.getProperty(CONNECTION_PER_FLOWFILE).asBoolean() : false;
- final NettyEventSenderFactory factory = getNettyEventSenderFactory(hostname, port, protocol);
+ final NettyEventSenderFactory<T> factory = getNettyEventSenderFactory(hostname, port, protocol);
factory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
factory.setWorkerThreads(context.getMaxConcurrentTasks());
factory.setMaxConnections(context.getMaxConcurrentTasks());
@@ -473,7 +468,5 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr
return context.getProperty(PROTOCOL).getValue();
}
- protected NettyEventSenderFactory<?> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) {
- return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.valueOf(protocol));
- }
+ protected abstract NettyEventSenderFactory<T> getNettyEventSenderFactory(String hostname, int port, String protocol);
}
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
index 0a354d6..7668401 100644
--- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
@@ -26,6 +26,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -35,6 +37,9 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventException;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
+import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -44,7 +49,6 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@@ -54,7 +58,7 @@ import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
"Delimiter is provided, then this processor will read messages from the incoming FlowFile based on the " +
"delimiter, and send each message to Splunk. If a Message Delimiter is not provided then the content of " +
"the FlowFile will be sent directly to Splunk as if it were a single message.")
-public class PutSplunk extends AbstractPutEventProcessor {
+public class PutSplunk extends AbstractPutEventProcessor<byte[]> {
public static final char NEW_LINE_CHAR = '\n';
@@ -98,14 +102,6 @@ public class PutSplunk extends AbstractPutEventProcessor {
}
@Override
- protected String createTransitUri(ProcessContext context) {
- final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
- final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
- final String protocol = context.getProperty(PROTOCOL).getValue().toLowerCase();
- return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
- }
-
- @Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
// first complete any batches from previous executions
FlowFileMessageBatch batch;
@@ -140,22 +136,19 @@ public class PutSplunk extends AbstractPutEventProcessor {
}
}
+ @Override
+ protected NettyEventSenderFactory<byte[]> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) {
+ return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.valueOf(protocol));
+ }
+
/**
* Send the entire FlowFile as a single message.
*/
private void processSingleMessage(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
- // copy the contents of the FlowFile to the ByteArrayOutputStream
- final ByteArrayOutputStream baos = new ByteArrayOutputStream((int)flowFile.getSize() + 1);
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- StreamUtils.copy(in, baos);
- }
- });
+ byte[] buf = readFlowFile(session, flowFile);
// if TCP and we don't end in a new line then add one
final String protocol = context.getProperty(PROTOCOL).getValue();
- byte[] buf = baos.toByteArray();
if (protocol.equals(TCP_VALUE.getValue()) && buf[buf.length - 1] != NEW_LINE_CHAR) {
final byte[] updatedBuf = new byte[buf.length + 1];
System.arraycopy(buf, 0, updatedBuf, 0, buf.length);
@@ -280,4 +273,12 @@ public class PutSplunk extends AbstractPutEventProcessor {
return Arrays.copyOfRange(baos.toByteArray(), 0, length);
}
}
+
+ private byte[] readFlowFile(final ProcessSession session, final FlowFile flowFile) {
+ try (InputStream inputStream = session.read(flowFile)) {
+ return IOUtils.toByteArray(inputStream);
+ } catch (final IOException e) {
+ throw new ProcessException("Read FlowFile Failed", e);
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
index 7bcdcc1..639d022 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
@@ -32,11 +32,9 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
import org.apache.nifi.util.StopWatch;
-import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
@@ -53,16 +51,7 @@ import java.util.concurrent.TimeUnit;
@SeeAlso({ListenTCP.class, PutUDP.class})
@Tags({ "remote", "egress", "put", "tcp" })
@SupportsBatching
-public class PutTCP extends AbstractPutEventProcessor {
-
- @Override
- protected String createTransitUri(final ProcessContext context) {
- final String protocol = TCP_VALUE.getValue();
- final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
- final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
-
- return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
- }
+public class PutTCP extends AbstractPutEventProcessor<InputStream> {
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
@@ -81,28 +70,27 @@ public class PutTCP extends AbstractPutEventProcessor {
return;
}
+ final StopWatch stopWatch = new StopWatch(true);
try {
- StopWatch stopWatch = new StopWatch(true);
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- InputStream event = in;
-
- String delimiter = getOutgoingMessageDelimiter(context, flowFile);
- if (delimiter != null) {
- final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
- event = new DelimitedInputStream(in, delimiter.getBytes(charSet));
- }
+ session.read(flowFile, inputStream -> {
+ InputStream inputStreamEvent = inputStream;
- eventSender.sendEvent(event);
+ final String delimiter = getOutgoingMessageDelimiter(context, flowFile);
+ if (delimiter != null) {
+ final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
+ inputStreamEvent = new DelimitedInputStream(inputStream, delimiter.getBytes(charSet));
}
+
+ eventSender.sendEvent(inputStreamEvent);
});
session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
- } catch (Exception e) {
- getLogger().error("Exception while handling a process session, transferring {} to failure.", flowFile, e);
+ session.commitAsync();
+ } catch (final Exception e) {
+ getLogger().error("Send Failed {}", flowFile, e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
+ session.commitAsync();
context.yield();
}
}
@@ -113,7 +101,7 @@ public class PutTCP extends AbstractPutEventProcessor {
}
@Override
- protected NettyEventSenderFactory<?> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) {
- return new StreamingNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.valueOf(protocol));
+ protected NettyEventSenderFactory<InputStream> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) {
+ return new StreamingNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.TCP);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
index 9990701..b36129c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
@@ -16,22 +16,24 @@
*/
package org.apache.nifi.processors.standard;
+import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
+import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
-import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
@@ -43,23 +45,7 @@ import java.util.concurrent.TimeUnit;
@SeeAlso({ListenUDP.class, PutTCP.class})
@Tags({ "remote", "egress", "put", "udp" })
@SupportsBatching
-public class PutUDP extends AbstractPutEventProcessor {
-
- /**
- * Creates a Universal Resource Identifier (URI) for this processor. Constructs a URI of the form UDP://host:port where the host and port values are taken from the configured property values.
- *
- * @param context - the current process context.
- *
- * @return The URI value as a String.
- */
- @Override
- protected String createTransitUri(final ProcessContext context) {
- final String protocol = UDP_VALUE.getValue();
- final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
- final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
-
- return protocol + "://" + host + ":" + port;
- }
+public class PutUDP extends AbstractPutEventProcessor<byte[]> {
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
@@ -69,17 +55,18 @@ public class PutUDP extends AbstractPutEventProcessor {
return;
}
+ final StopWatch stopWatch = new StopWatch(true);
try {
- StopWatch stopWatch = new StopWatch(true);
final byte[] content = readContent(session, flowFile);
eventSender.sendEvent(content);
session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
session.commitAsync();
- } catch (Exception e) {
- getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[]{flowFile}, e);
+ } catch (final Exception e) {
+ getLogger().error("Send Failed {}", flowFile, e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
+ session.commitAsync();
context.yield();
}
}
@@ -89,12 +76,14 @@ public class PutUDP extends AbstractPutEventProcessor {
return UDP_VALUE.getValue();
}
+ @Override
+ protected NettyEventSenderFactory<byte[]> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) {
+ return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.UDP);
+ }
+
private byte[] readContent(final ProcessSession session, final FlowFile flowFile) throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) flowFile.getSize());
- try (final InputStream in = session.read(flowFile)) {
- StreamUtils.copy(in, baos);
+ try (final InputStream inputStream = session.read(flowFile)) {
+ return IOUtils.toByteArray(inputStream);
}
-
- return baos.toByteArray();
}
}