You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by th...@apache.org on 2021/11/17 16:55:47 UTC

[nifi] branch main updated: NIFI-9384 Corrected usage and generics in ListenTCP

This is an automated email from the ASF dual-hosted git repository.

thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 0cf515c  NIFI-9384 Corrected usage and generics in ListenTCP
0cf515c is described below

commit 0cf515c9c0d58ae41218135a331ca09fe3bb4fec
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Tue Nov 16 15:22:09 2021 -0600

    NIFI-9384 Corrected usage and generics in ListenTCP
    
    - Addressed compiler warnings in ListenTCP and EventBatcher
    - Adjusted ListenTCP property order to match previous version
    
    Signed-off-by: Nathan Gough <th...@gmail.com>
    
    This closes #5526.
---
 .../nifi/processor/util/listen/EventBatcher.java   | 32 ++++++++------------
 .../apache/nifi/processors/standard/ListenTCP.java | 35 ++++++++++------------
 2 files changed, 29 insertions(+), 38 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
index bcdb598..7a8fff2 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
@@ -20,10 +20,7 @@ import org.apache.nifi.event.transport.message.ByteArrayMessage;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 
-import java.io.IOException;
-import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
@@ -34,11 +31,11 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
 
     public static final int POLL_TIMEOUT_MS = 20;
 
-    private volatile BlockingQueue<E> events;
-    private volatile BlockingQueue<E> errorEvents;
+    private final BlockingQueue<E> events;
+    private final BlockingQueue<E> errorEvents;
     private final ComponentLog logger;
 
-    public EventBatcher(final ComponentLog logger, final BlockingQueue events, final BlockingQueue errorEvents) {
+    public EventBatcher(final ComponentLog logger, final BlockingQueue<E> events, final BlockingQueue<E> errorEvents) {
         this.logger = logger;
         this.events = events;
         this.errorEvents = errorEvents;
@@ -56,10 +53,10 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
      * @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all
      * the batches will be <= batchSize
      */
-    public Map<String, FlowFileEventBatch> getBatches(final ProcessSession session, final int totalBatchSize,
+    public Map<String, FlowFileEventBatch<E>> getBatches(final ProcessSession session, final int totalBatchSize,
                                                       final byte[] messageDemarcatorBytes) {
 
-        final Map<String, FlowFileEventBatch> batches = new HashMap<String, FlowFileEventBatch>();
+        final Map<String, FlowFileEventBatch<E>> batches = new HashMap<>();
         for (int i = 0; i < totalBatchSize; i++) {
             final E event = getMessage(true, true, session);
             if (event == null) {
@@ -67,11 +64,11 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
             }
 
             final String batchKey = getBatchKey(event);
-            FlowFileEventBatch batch = batches.get(batchKey);
+            FlowFileEventBatch<E> batch = batches.get(batchKey);
 
             // if we don't have a batch for this key then create a new one
             if (batch == null) {
-                batch = new FlowFileEventBatch(session.create(), new ArrayList<E>());
+                batch = new FlowFileEventBatch<>(session.create(), new ArrayList<>());
                 batches.put(batchKey, batch);
             }
 
@@ -82,15 +79,12 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
             final boolean writeDemarcator = (i > 0);
             try {
                 final byte[] rawMessage = event.getMessage();
-                FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream out) throws IOException {
-                        if (writeDemarcator) {
-                            out.write(messageDemarcatorBytes);
-                        }
-
-                        out.write(rawMessage);
+                FlowFile appendedFlowFile = session.append(batch.getFlowFile(), out -> {
+                    if (writeDemarcator) {
+                        out.write(messageDemarcatorBytes);
                     }
+
+                    out.write(rawMessage);
                 });
 
                 // update the FlowFile reference in the batch object
@@ -99,7 +93,7 @@ public abstract class EventBatcher<E extends ByteArrayMessage> {
             } catch (final Exception e) {
                 logger.error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again",
                         e.getMessage(), e);
-                errorEvents.offer(event);
+                errorEvents.add(event);
                 break;
             }
         }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index eb7f668..e6f29c9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -130,11 +130,12 @@ public class ListenTCP extends AbstractProcessor {
     protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
     protected volatile EventServer eventServer;
     protected volatile byte[] messageDemarcatorBytes;
-    protected volatile EventBatcher eventBatcher;
+    protected volatile EventBatcher<ByteArrayMessage> eventBatcher;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
         descriptors.add(ListenerProperties.PORT);
         descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
         descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
@@ -148,9 +149,8 @@ public class ListenTCP extends AbstractProcessor {
         descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
         // Deprecated
         descriptors.add(POOL_RECV_BUFFERS);
-        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
-        descriptors.add(CLIENT_AUTH);
         descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(CLIENT_AUTH);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<>();
@@ -163,14 +163,14 @@ public class ListenTCP extends AbstractProcessor {
         int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
         int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
-        InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface);
+        InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
         Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
         port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
         events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
         errorEvents = new LinkedBlockingQueue<>();
         final String msgDemarcator = getMessageDemarcator(context);
         messageDemarcatorBytes = msgDemarcator.getBytes(charset);
-        final NettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port, TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
+        final NettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(getLogger(), address, port, TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
 
         final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
         if (sslContextService != null) {
@@ -183,23 +183,24 @@ public class ListenTCP extends AbstractProcessor {
 
         eventFactory.setSocketReceiveBuffer(bufferSize);
         eventFactory.setWorkerThreads(maxConnections);
+        eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
 
         try {
             eventServer = eventFactory.getEventServer();
         } catch (EventException e) {
-            getLogger().error("Failed to bind to [{}:{}].", hostname.getHostAddress(), port);
+            getLogger().error("Failed to bind to [{}:{}]", address, port, e);
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
-        Map<String, FlowFileEventBatch> batches = getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
+        Map<String, FlowFileEventBatch<ByteArrayMessage>> batches = getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
         processEvents(session, batches);
     }
 
-    private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch> batches) {
-        for (Map.Entry<String, FlowFileEventBatch> entry : batches.entrySet()) {
+    private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch<ByteArrayMessage>> batches) {
+        for (Map.Entry<String, FlowFileEventBatch<ByteArrayMessage>> entry : batches.entrySet()) {
             FlowFile flowFile = entry.getValue().getFlowFile();
             final List<ByteArrayMessage> events = entry.getValue().getEvents();
 
@@ -245,7 +246,7 @@ public class ListenTCP extends AbstractProcessor {
         return results;
     }
 
-    protected Map<String, String> getAttributes(final FlowFileEventBatch batch) {
+    protected Map<String, String> getAttributes(final FlowFileEventBatch<ByteArrayMessage> batch) {
         final List<ByteArrayMessage> events = batch.getEvents();
         final String sender = events.get(0).getSender();
         final Map<String,String> attributes = new HashMap<>(3);
@@ -254,13 +255,11 @@ public class ListenTCP extends AbstractProcessor {
         return attributes;
     }
 
-    protected String getTransitUri(FlowFileEventBatch batch) {
+    protected String getTransitUri(final FlowFileEventBatch<ByteArrayMessage> batch) {
         final List<ByteArrayMessage> events = batch.getEvents();
         final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
-        final String transitUri = new StringBuilder().append("tcp").append("://").append(senderHost).append(":")
-                .append(port).toString();
-        return transitUri;
+        return String.format("tcp://%s:%d", senderHost, port);
     }
 
     @Override
@@ -279,17 +278,15 @@ public class ListenTCP extends AbstractProcessor {
                 .replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
     }
 
-    private EventBatcher getEventBatcher() {
-        if (eventBatcher != null) {
-            return eventBatcher;
-        } else {
+    private EventBatcher<ByteArrayMessage> getEventBatcher() {
+        if (eventBatcher == null) {
             eventBatcher = new EventBatcher<ByteArrayMessage>(getLogger(), events, errorEvents) {
                 @Override
                 protected String getBatchKey(ByteArrayMessage event) {
                     return event.getSender();
                 }
             };
-            return eventBatcher;
         }
+        return eventBatcher;
     }
 }
\ No newline at end of file