You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/01/11 16:23:21 UTC

nifi git commit: NIFI-3648 removed message copying when not in debug mode. This closes #1637.

Repository: nifi
Updated Branches:
  refs/heads/master 674c9e468 -> bcac2766b


NIFI-3648 removed message copying when not in debug mode. This closes #1637.

Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bcac2766
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bcac2766
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bcac2766

Branch: refs/heads/master
Commit: bcac2766bce63508b7930344b9dc21ceef33bf98
Parents: 674c9e4
Author: Mike Moser <mo...@apache.org>
Authored: Thu Mar 30 14:34:26 2017 +0000
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Jan 11 11:23:04 2018 -0500

----------------------------------------------------------------------
 .../protocol/impl/SocketProtocolListener.java   | 24 ++++++++++++--------
 1 file changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bcac2766/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
index 8958988..e31a547 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
@@ -24,7 +24,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolContext;
@@ -45,6 +44,7 @@ import org.apache.nifi.io.socket.SocketListener;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.security.util.CertificateUtils;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.nifi.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,9 +121,7 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
 
     @Override
     public void dispatchRequest(final Socket socket) {
-        byte[] receivedMessage = null;
         String hostname = null;
-        final int maxMsgBuffer = 1024 * 1024;   // don't buffer more than 1 MB of the message
         try {
             final StopWatch stopWatch = new StopWatch(true);
             hostname = socket.getInetAddress().getHostName();
@@ -134,15 +132,21 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
 
             // unmarshall message
             final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
-            final InputStream inStream = socket.getInputStream();
-            final CopyingInputStream copyingInputStream = new CopyingInputStream(inStream, maxMsgBuffer); // don't copy more than 1 MB
+            final ByteCountingInputStream countingIn = new ByteCountingInputStream(socket.getInputStream());
+            InputStream wrappedInStream = countingIn;
+            if (logger.isDebugEnabled()) {
+                final int maxMsgBuffer = 1024 * 1024;   // don't buffer more than 1 MB of the message
+                final CopyingInputStream copyingInputStream = new CopyingInputStream(wrappedInStream, maxMsgBuffer);
+                wrappedInStream = copyingInputStream;
+            }
 
             final ProtocolMessage request;
             try {
-                request = unmarshaller.unmarshal(copyingInputStream);
+                request = unmarshaller.unmarshal(wrappedInStream);
             } finally {
-                receivedMessage = copyingInputStream.getBytesRead();
-                if (logger.isDebugEnabled()) {
+                if (logger.isDebugEnabled() && wrappedInStream instanceof CopyingInputStream) {
+                    final CopyingInputStream copyingInputStream = (CopyingInputStream) wrappedInStream;
+                    byte[] receivedMessage = copyingInputStream.getBytesRead();
                     logger.debug("Received message: " + new String(receivedMessage));
                 }
             }
@@ -181,8 +185,8 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
             stopWatch.stop();
             final NodeIdentifier nodeId = getNodeIdentifier(request);
             final String from = nodeId == null ? hostname : nodeId.toString();
-            logger.info("Finished processing request {} (type={}, length={} bytes) from {} in {} millis",
-                requestId, request.getType(), receivedMessage.length, from, stopWatch.getDuration(TimeUnit.MILLISECONDS));
+            logger.info("Finished processing request {} (type={}, length={} bytes) from {} in {}",
+                requestId, request.getType(), countingIn.getBytesRead(), from, stopWatch.getDuration());
         } catch (final IOException | ProtocolException e) {
             logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e);