You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2022/06/14 13:47:46 UTC

[nifi] branch main updated: NIFI-9654 Added Queue Logging to ListenTCP

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 554f648f00 NIFI-9654 Added Queue Logging to ListenTCP
554f648f00 is described below

commit 554f648f003790727c4f28381e9e78d32f46cbd6
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Fri Jun 10 08:36:15 2022 -0500

    NIFI-9654 Added Queue Logging to ListenTCP
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #6116.
---
 .../apache/nifi/processors/standard/ListenTCP.java | 27 ++++++++++++++++++++--
 1 file changed, 25 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index b30ce88223..4a4ccf0ec0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -48,6 +48,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.listen.EventBatcher;
 import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
 import org.apache.nifi.processor.util.listen.ListenerProperties;
+import org.apache.nifi.processor.util.listen.queue.TrackingLinkedBlockingQueue;
 import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.security.util.ClientAuth;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
@@ -57,6 +58,7 @@ import javax.net.ssl.SSLContext;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.charset.Charset;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -67,6 +69,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 @SupportsBatching
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@@ -135,10 +138,14 @@ public class ListenTCP extends AbstractProcessor {
             .description("Messages received successfully will be sent out this relationship.")
             .build();
 
+    private static final long TRACKING_LOG_INTERVAL = 60000;
+    private final AtomicLong nextTrackingLog = new AtomicLong();
+    private int eventsCapacity;
+
     protected List<PropertyDescriptor> descriptors;
     protected Set<Relationship> relationships;
     protected volatile int port;
-    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile TrackingLinkedBlockingQueue<ByteArrayMessage> events;
     protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
     protected volatile EventServer eventServer;
     protected volatile byte[] messageDemarcatorBytes;
@@ -177,7 +184,8 @@ public class ListenTCP extends AbstractProcessor {
         final InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
         final Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
         port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
-        events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        eventsCapacity = context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger();
+        events = new TrackingLinkedBlockingQueue<>(eventsCapacity);
         errorEvents = new LinkedBlockingQueue<>();
         final String msgDemarcator = getMessageDemarcator(context);
         messageDemarcatorBytes = msgDemarcator.getBytes(charset);
@@ -209,6 +217,7 @@ public class ListenTCP extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        processTrackingLog();
         final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
         Map<String, FlowFileEventBatch<ByteArrayMessage>> batches = getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
         processEvents(session, batches);
@@ -313,4 +322,18 @@ public class ListenTCP extends AbstractProcessor {
             attributes.put(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, sslSessionStatus.getIssuer().getName());
         }
     }
+
+    private void processTrackingLog() {
+        final long now = Instant.now().toEpochMilli();
+        if (now > nextTrackingLog.get()) {
+            getLogger().debug("Event Queue Capacity [{}] Remaining [{}] Size [{}] Largest Size [{}]",
+                    eventsCapacity,
+                    events.remainingCapacity(),
+                    events.size(),
+                    events.getLargestSize()
+            );
+            final long nextTrackingLogScheduled = now + TRACKING_LOG_INTERVAL;
+            nextTrackingLog.getAndSet(nextTrackingLogScheduled);
+        }
+    }
 }
\ No newline at end of file