You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/06/11 20:15:22 UTC

[nifi] branch main updated: NIFI-8689: This closes #5150. Avoid flushing the socket buffer unnecessarily when sending a series of FlowFiles via site-to-site

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 64f600d  NIFI-8689: This closes #5150. Avoid flushing the socket buffer unnecessarily when sending a series of FlowFiles via site-to-site
64f600d is described below

commit 64f600d0ce28be3856c1abdf947d05058ea548a7
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Jun 11 13:27:32 2021 -0400

    NIFI-8689: This closes #5150. Avoid flushing the socket buffer unnecessarily when sending a series of FlowFiles via site-to-site
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../org/apache/nifi/remote/AbstractTransaction.java    |  9 +++++++--
 .../nifi/remote/codec/StandardFlowFileCodec.java       | 15 +++++++--------
 .../org/apache/nifi/remote/protocol/ResponseCode.java  | 18 ++++++++++++++++--
 .../remote/protocol/http/HttpClientTransaction.java    | 13 +++++++------
 .../protocol/socket/SocketClientTransaction.java       |  6 +++---
 5 files changed, 40 insertions(+), 21 deletions(-)

diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
index 826cf00..9b8920c 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
@@ -171,7 +171,12 @@ public abstract class AbstractTransaction implements Transaction {
     protected final void writeTransactionResponse(ResponseCode response) throws IOException {
         writeTransactionResponse(response, null);
     }
-    abstract protected void writeTransactionResponse(ResponseCode response, String explanation) throws IOException;
+
+    protected void writeTransactionResponse(ResponseCode response, String explanation) throws IOException {
+        writeTransactionResponse(response, explanation, true);
+    }
+
+    abstract protected void writeTransactionResponse(ResponseCode response, String explanation, boolean flush) throws IOException;
 
     @Override
     public final void confirm() throws IOException {
@@ -357,7 +362,7 @@ public abstract class AbstractTransaction implements Transaction {
                 }
 
                 if (transfers > 0) {
-                    writeTransactionResponse(ResponseCode.CONTINUE_TRANSACTION);
+                    writeTransactionResponse(ResponseCode.CONTINUE_TRANSACTION, null, false);
                 }
 
                 logger.debug("{} Sending data to {}", this, peer);
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
index 0bee537..331be45 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
@@ -16,6 +16,13 @@
  */
 package org.apache.nifi.remote.codec;
 
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -26,13 +33,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.protocol.DataPacket;
-import org.apache.nifi.remote.util.StandardDataPacket;
-import org.apache.nifi.stream.io.StreamUtils;
-
 public class StandardFlowFileCodec implements FlowFileCodec {
 
     public static final int MAX_NUM_ATTRIBUTES = 25000;
@@ -60,7 +60,6 @@ public class StandardFlowFileCodec implements FlowFileCodec {
 
         final InputStream in = dataPacket.getData();
         StreamUtils.copy(in, encodedOut);
-        encodedOut.flush();
     }
 
     @Override
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java
index 18594e7..1f2670f 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java
@@ -95,22 +95,36 @@ public enum ResponseCode {
     }
 
     public void writeResponse(final DataOutputStream out) throws IOException {
+        writeResponse(out, true);
+    }
+
+    public void writeResponse(final DataOutputStream out, final boolean flush) throws IOException {
         if (containsMessage()) {
             throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation");
         }
 
         out.write(getCodeSequence());
-        out.flush();
+
+        if (flush) {
+            out.flush();
+        }
     }
 
     public void writeResponse(final DataOutputStream out, final String explanation) throws IOException {
+        writeResponse(out, explanation, true);
+    }
+
+    public void writeResponse(final DataOutputStream out, final String explanation, final boolean flush) throws IOException {
         if (!containsMessage()) {
             throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation");
         }
 
         out.write(getCodeSequence());
         out.writeUTF(explanation);
-        out.flush();
+
+        if (flush) {
+            out.flush();
+        }
     }
 
     static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException {
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
index d41cee3..9a9ea9b 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
@@ -16,11 +16,6 @@
  */
 package org.apache.nifi.remote.protocol.http;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.AbstractTransaction;
@@ -33,6 +28,12 @@ import org.apache.nifi.remote.protocol.ResponseCode;
 import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
 import org.apache.nifi.web.api.entity.TransactionResultEntity;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 public class HttpClientTransaction extends AbstractTransaction {
 
     private SiteToSiteRestApiClient apiClient;
@@ -108,7 +109,7 @@ public class HttpClientTransaction extends AbstractTransaction {
     }
 
     @Override
-    protected void writeTransactionResponse(ResponseCode response, String explanation) throws IOException {
+    protected void writeTransactionResponse(ResponseCode response, String explanation, boolean flush) throws IOException {
         HttpCommunicationsSession commSession = (HttpCommunicationsSession) peer.getCommunicationsSession();
         if(TransferDirection.RECEIVE.equals(direction)){
             switch (response) {
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index 8b68c9e..b21f3d5 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -83,11 +83,11 @@ public class SocketClientTransaction extends AbstractTransaction {
     }
 
     @Override
-    protected void writeTransactionResponse(ResponseCode response, String explanation) throws IOException {
+    protected void writeTransactionResponse(ResponseCode response, String explanation, boolean flush) throws IOException {
         if(explanation == null){
-            response.writeResponse(dos);
+            response.writeResponse(dos, flush);
         } else {
-            response.writeResponse(dos, explanation);
+            response.writeResponse(dos, explanation, flush);
         }
     }
 }