You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2022/06/14 13:47:46 UTC
[nifi] branch main updated: NIFI-9654 Added Queue Logging to ListenTCP
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 554f648f00 NIFI-9654 Added Queue Logging to ListenTCP
554f648f00 is described below
commit 554f648f003790727c4f28381e9e78d32f46cbd6
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Fri Jun 10 08:36:15 2022 -0500
NIFI-9654 Added Queue Logging to ListenTCP
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #6116.
---
.../apache/nifi/processors/standard/ListenTCP.java | 27 ++++++++++++++++++++--
1 file changed, 25 insertions(+), 2 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index b30ce88223..4a4ccf0ec0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -48,6 +48,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.ListenerProperties;
+import org.apache.nifi.processor.util.listen.queue.TrackingLinkedBlockingQueue;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
@@ -57,6 +58,7 @@ import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -67,6 +69,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@@ -135,10 +138,14 @@ public class ListenTCP extends AbstractProcessor {
.description("Messages received successfully will be sent out this relationship.")
.build();
+ private static final long TRACKING_LOG_INTERVAL = 60000;
+ private final AtomicLong nextTrackingLog = new AtomicLong();
+ private int eventsCapacity;
+
protected List<PropertyDescriptor> descriptors;
protected Set<Relationship> relationships;
protected volatile int port;
- protected volatile BlockingQueue<ByteArrayMessage> events;
+ protected volatile TrackingLinkedBlockingQueue<ByteArrayMessage> events;
protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
protected volatile EventServer eventServer;
protected volatile byte[] messageDemarcatorBytes;
@@ -177,7 +184,8 @@ public class ListenTCP extends AbstractProcessor {
final InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
final Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
- events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+ eventsCapacity = context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger();
+ events = new TrackingLinkedBlockingQueue<>(eventsCapacity);
errorEvents = new LinkedBlockingQueue<>();
final String msgDemarcator = getMessageDemarcator(context);
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
@@ -209,6 +217,7 @@ public class ListenTCP extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ processTrackingLog();
final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
Map<String, FlowFileEventBatch<ByteArrayMessage>> batches = getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
processEvents(session, batches);
@@ -313,4 +322,18 @@ public class ListenTCP extends AbstractProcessor {
attributes.put(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, sslSessionStatus.getIssuer().getName());
}
}
+
+ private void processTrackingLog() {
+ final long now = Instant.now().toEpochMilli();
+ if (now > nextTrackingLog.get()) {
+ getLogger().debug("Event Queue Capacity [{}] Remaining [{}] Size [{}] Largest Size [{}]",
+ eventsCapacity,
+ events.remainingCapacity(),
+ events.size(),
+ events.getLargestSize()
+ );
+ final long nextTrackingLogScheduled = now + TRACKING_LOG_INTERVAL;
+ nextTrackingLog.getAndSet(nextTrackingLogScheduled);
+ }
+ }
}
\ No newline at end of file