You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/01/18 20:31:54 UTC

[nifi] branch main updated: NIFI-9571 Corrected Session commit handling in PutTCP

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 405934d  NIFI-9571 Corrected Session commit handling in PutTCP
405934d is described below

commit 405934dcd216db24a4d1009dc8e8da5f76bf165d
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Jan 13 13:15:50 2022 -0600

    NIFI-9571 Corrected Session commit handling in PutTCP
    
    - Added generic type to AbstractPutEventProcessor for compiler checking of event types
    - Refactored createTransitUri to shared method in AbstractPutEventProcessor
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #5658.
---
 .../util/put/AbstractPutEventProcessor.java        | 29 ++++++--------
 .../apache/nifi/processors/splunk/PutSplunk.java   | 39 +++++++++----------
 .../apache/nifi/processors/standard/PutTCP.java    | 44 ++++++++--------------
 .../apache/nifi/processors/standard/PutUDP.java    | 43 ++++++++-------------
 4 files changed, 63 insertions(+), 92 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 659e3df..567f8fd 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -22,8 +22,6 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.event.transport.EventSender;
-import org.apache.nifi.event.transport.configuration.TransportProtocol;
-import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
 import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -52,7 +50,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * A base class for processors that send data to an external system using TCP or UDP.
  */
-public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryProcessor {
+public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactoryProcessor {
 
     public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
             .name("Hostname")
@@ -164,7 +162,7 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr
     private List<PropertyDescriptor> descriptors;
 
     protected volatile String transitUri;
-    protected EventSender eventSender;
+    protected EventSender<T> eventSender;
 
     protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>();
     protected final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<>());
@@ -229,23 +227,20 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr
         }
     }
 
-    /**
-     * Sub-classes construct a transit uri for provenance events. Called from @OnScheduled
-     * method of this class.
-     *
-     * @param context the current context
-     *
-     * @return the transit uri
-     */
-    protected abstract String createTransitUri(final ProcessContext context);
+    protected String createTransitUri(ProcessContext context) {
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+        final String protocol = getProtocol(context);
+        return String.format("%s://%s:%s", protocol, host, port);
+    }
 
-    protected EventSender<?> getEventSender(final ProcessContext context) {
+    protected EventSender<T> getEventSender(final ProcessContext context) {
         final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
         final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
         final String protocol = getProtocol(context);
         final boolean singleEventPerConnection = context.getProperty(CONNECTION_PER_FLOWFILE).getValue() != null ? context.getProperty(CONNECTION_PER_FLOWFILE).asBoolean() : false;
 
-        final NettyEventSenderFactory factory = getNettyEventSenderFactory(hostname, port, protocol);
+        final NettyEventSenderFactory<T> factory = getNettyEventSenderFactory(hostname, port, protocol);
         factory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
         factory.setWorkerThreads(context.getMaxConcurrentTasks());
         factory.setMaxConnections(context.getMaxConcurrentTasks());
@@ -473,7 +468,5 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr
         return context.getProperty(PROTOCOL).getValue();
     }
 
-    protected NettyEventSenderFactory<?> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) {
-        return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.valueOf(protocol));
-    }
+    protected abstract NettyEventSenderFactory<T> getNettyEventSenderFactory(String hostname, int port, String protocol);
 }
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
index 0a354d6..7668401 100644
--- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
@@ -26,6 +26,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -35,6 +37,9 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.event.transport.EventException;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
+import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -44,7 +49,6 @@ import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@@ -54,7 +58,7 @@ import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
         "Delimiter is provided, then this processor will read messages from the incoming FlowFile based on the " +
         "delimiter, and send each message to Splunk. If a Message Delimiter is not provided then the content of " +
         "the FlowFile will be sent directly to Splunk as if it were a single message.")
-public class PutSplunk extends AbstractPutEventProcessor {
+public class PutSplunk extends AbstractPutEventProcessor<byte[]> {
 
     public static final char NEW_LINE_CHAR = '\n';
 
@@ -98,14 +102,6 @@ public class PutSplunk extends AbstractPutEventProcessor {
     }
 
     @Override
-    protected String createTransitUri(ProcessContext context) {
-        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
-        final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
-        final String protocol = context.getProperty(PROTOCOL).getValue().toLowerCase();
-        return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
-    }
-
-    @Override
     public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
         // first complete any batches from previous executions
         FlowFileMessageBatch batch;
@@ -140,22 +136,19 @@ public class PutSplunk extends AbstractPutEventProcessor {
         }
     }
 
+    @Override
+    protected NettyEventSenderFactory<byte[]> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) {
+        return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.valueOf(protocol));
+    }
+
     /**
      * Send the entire FlowFile as a single message.
      */
     private void processSingleMessage(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
-        // copy the contents of the FlowFile to the ByteArrayOutputStream
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream((int)flowFile.getSize() + 1);
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream in) throws IOException {
-                StreamUtils.copy(in, baos);
-            }
-        });
+        byte[] buf = readFlowFile(session, flowFile);
 
         // if TCP and we don't end in a new line then add one
         final String protocol = context.getProperty(PROTOCOL).getValue();
-        byte[] buf = baos.toByteArray();
         if (protocol.equals(TCP_VALUE.getValue()) && buf[buf.length - 1] != NEW_LINE_CHAR) {
             final byte[] updatedBuf = new byte[buf.length + 1];
             System.arraycopy(buf, 0, updatedBuf, 0, buf.length);
@@ -280,4 +273,12 @@ public class PutSplunk extends AbstractPutEventProcessor {
             return Arrays.copyOfRange(baos.toByteArray(), 0, length);
         }
     }
+
+    private byte[] readFlowFile(final ProcessSession session, final FlowFile flowFile) {
+        try (InputStream inputStream = session.read(flowFile)) {
+            return IOUtils.toByteArray(inputStream);
+        } catch (final IOException e) {
+            throw new ProcessException("Read FlowFile Failed", e);
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
index 7bcdcc1..639d022 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
@@ -32,11 +32,9 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
 import org.apache.nifi.util.StopWatch;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.util.Arrays;
@@ -53,16 +51,7 @@ import java.util.concurrent.TimeUnit;
 @SeeAlso({ListenTCP.class, PutUDP.class})
 @Tags({ "remote", "egress", "put", "tcp" })
 @SupportsBatching
-public class PutTCP extends AbstractPutEventProcessor {
-
-    @Override
-    protected String createTransitUri(final ProcessContext context) {
-        final String protocol = TCP_VALUE.getValue();
-        final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
-        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
-
-        return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
-    }
+public class PutTCP extends AbstractPutEventProcessor<InputStream> {
 
     @Override
     protected List<PropertyDescriptor> getAdditionalProperties() {
@@ -81,28 +70,27 @@ public class PutTCP extends AbstractPutEventProcessor {
             return;
         }
 
+        final StopWatch stopWatch = new StopWatch(true);
         try {
-            StopWatch stopWatch = new StopWatch(true);
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream in) throws IOException {
-                    InputStream event = in;
-
-                    String delimiter = getOutgoingMessageDelimiter(context, flowFile);
-                    if (delimiter != null) {
-                        final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
-                        event = new DelimitedInputStream(in, delimiter.getBytes(charSet));
-                    }
+            session.read(flowFile, inputStream -> {
+                InputStream inputStreamEvent = inputStream;
 
-                    eventSender.sendEvent(event);
+                final String delimiter = getOutgoingMessageDelimiter(context, flowFile);
+                if (delimiter != null) {
+                    final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
+                    inputStreamEvent = new DelimitedInputStream(inputStream, delimiter.getBytes(charSet));
                 }
+
+                eventSender.sendEvent(inputStreamEvent);
             });
 
             session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (Exception e) {
-            getLogger().error("Exception while handling a process session, transferring {} to failure.", flowFile, e);
+            session.commitAsync();
+        } catch (final Exception e) {
+            getLogger().error("Send Failed {}", flowFile, e);
             session.transfer(session.penalize(flowFile), REL_FAILURE);
+            session.commitAsync();
             context.yield();
         }
     }
@@ -113,7 +101,7 @@ public class PutTCP extends AbstractPutEventProcessor {
     }
 
     @Override
-    protected NettyEventSenderFactory<?> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) {
-        return new StreamingNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.valueOf(protocol));
+    protected NettyEventSenderFactory<InputStream> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) {
+        return new StreamingNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.TCP);
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
index 9990701..b36129c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
@@ -16,22 +16,24 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
+import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.concurrent.TimeUnit;
@@ -43,23 +45,7 @@ import java.util.concurrent.TimeUnit;
 @SeeAlso({ListenUDP.class, PutTCP.class})
 @Tags({ "remote", "egress", "put", "udp" })
 @SupportsBatching
-public class PutUDP extends AbstractPutEventProcessor {
-
-    /**
-     * Creates a Universal Resource Identifier (URI) for this processor. Constructs a URI of the form UDP://host:port where the host and port values are taken from the configured property values.
-     *
-     * @param context - the current process context.
-     *
-     * @return The URI value as a String.
-     */
-    @Override
-    protected String createTransitUri(final ProcessContext context) {
-        final String protocol = UDP_VALUE.getValue();
-        final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
-        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
-
-        return protocol + "://" + host + ":" + port;
-    }
+public class PutUDP extends AbstractPutEventProcessor<byte[]> {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
@@ -69,17 +55,18 @@ public class PutUDP extends AbstractPutEventProcessor {
             return;
         }
 
+        final StopWatch stopWatch = new StopWatch(true);
         try {
-            StopWatch stopWatch = new StopWatch(true);
             final byte[] content = readContent(session, flowFile);
             eventSender.sendEvent(content);
 
             session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowFile, REL_SUCCESS);
             session.commitAsync();
-        } catch (Exception e) {
-            getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[]{flowFile}, e);
+        } catch (final Exception e) {
+            getLogger().error("Send Failed {}", flowFile, e);
             session.transfer(session.penalize(flowFile), REL_FAILURE);
+            session.commitAsync();
             context.yield();
         }
     }
@@ -89,12 +76,14 @@ public class PutUDP extends AbstractPutEventProcessor {
         return UDP_VALUE.getValue();
     }
 
+    @Override
+    protected NettyEventSenderFactory<byte[]> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) {
+        return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.UDP);
+    }
+
     private byte[] readContent(final ProcessSession session, final FlowFile flowFile) throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) flowFile.getSize());
-        try (final InputStream in = session.read(flowFile)) {
-            StreamUtils.copy(in, baos);
+        try (final InputStream inputStream = session.read(flowFile)) {
+            return IOUtils.toByteArray(inputStream);
         }
-
-        return baos.toByteArray();
     }
 }