You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/06/11 20:15:22 UTC
[nifi] branch main updated: NIFI-8689: This closes #5150. Avoid
flushing the socket buffer unnecessarily when sending a series of FlowFiles
via site-to-site
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 64f600d NIFI-8689: This closes #5150. Avoid flushing the socket buffer unnecessarily when sending a series of FlowFiles via site-to-site
64f600d is described below
commit 64f600d0ce28be3856c1abdf947d05058ea548a7
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Jun 11 13:27:32 2021 -0400
NIFI-8689: This closes #5150. Avoid flushing the socket buffer unnecessarily when sending a series of FlowFiles via site-to-site
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../org/apache/nifi/remote/AbstractTransaction.java | 9 +++++++--
.../nifi/remote/codec/StandardFlowFileCodec.java | 15 +++++++--------
.../org/apache/nifi/remote/protocol/ResponseCode.java | 18 ++++++++++++++++--
.../remote/protocol/http/HttpClientTransaction.java | 13 +++++++------
.../protocol/socket/SocketClientTransaction.java | 6 +++---
5 files changed, 40 insertions(+), 21 deletions(-)
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
index 826cf00..9b8920c 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
@@ -171,7 +171,12 @@ public abstract class AbstractTransaction implements Transaction {
protected final void writeTransactionResponse(ResponseCode response) throws IOException {
writeTransactionResponse(response, null);
}
- abstract protected void writeTransactionResponse(ResponseCode response, String explanation) throws IOException;
+
+ protected void writeTransactionResponse(ResponseCode response, String explanation) throws IOException {
+ writeTransactionResponse(response, explanation, true);
+ }
+
+ abstract protected void writeTransactionResponse(ResponseCode response, String explanation, boolean flush) throws IOException;
@Override
public final void confirm() throws IOException {
@@ -357,7 +362,7 @@ public abstract class AbstractTransaction implements Transaction {
}
if (transfers > 0) {
- writeTransactionResponse(ResponseCode.CONTINUE_TRANSACTION);
+ writeTransactionResponse(ResponseCode.CONTINUE_TRANSACTION, null, false);
}
logger.debug("{} Sending data to {}", this, peer);
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
index 0bee537..331be45 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
@@ -16,6 +16,13 @@
*/
package org.apache.nifi.remote.codec;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
@@ -26,13 +33,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.protocol.DataPacket;
-import org.apache.nifi.remote.util.StandardDataPacket;
-import org.apache.nifi.stream.io.StreamUtils;
-
public class StandardFlowFileCodec implements FlowFileCodec {
public static final int MAX_NUM_ATTRIBUTES = 25000;
@@ -60,7 +60,6 @@ public class StandardFlowFileCodec implements FlowFileCodec {
final InputStream in = dataPacket.getData();
StreamUtils.copy(in, encodedOut);
- encodedOut.flush();
}
@Override
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java
index 18594e7..1f2670f 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java
@@ -95,22 +95,36 @@ public enum ResponseCode {
}
public void writeResponse(final DataOutputStream out) throws IOException {
+ writeResponse(out, true);
+ }
+
+ public void writeResponse(final DataOutputStream out, final boolean flush) throws IOException {
if (containsMessage()) {
throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation");
}
out.write(getCodeSequence());
- out.flush();
+
+ if (flush) {
+ out.flush();
+ }
}
public void writeResponse(final DataOutputStream out, final String explanation) throws IOException {
+ writeResponse(out, explanation, true);
+ }
+
+ public void writeResponse(final DataOutputStream out, final String explanation, final boolean flush) throws IOException {
if (!containsMessage()) {
throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation");
}
out.write(getCodeSequence());
out.writeUTF(explanation);
- out.flush();
+
+ if (flush) {
+ out.flush();
+ }
}
static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException {
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
index d41cee3..9a9ea9b 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
@@ -16,11 +16,6 @@
*/
package org.apache.nifi.remote.protocol.http;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.AbstractTransaction;
@@ -33,6 +28,12 @@ import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.web.api.entity.TransactionResultEntity;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
public class HttpClientTransaction extends AbstractTransaction {
private SiteToSiteRestApiClient apiClient;
@@ -108,7 +109,7 @@ public class HttpClientTransaction extends AbstractTransaction {
}
@Override
- protected void writeTransactionResponse(ResponseCode response, String explanation) throws IOException {
+ protected void writeTransactionResponse(ResponseCode response, String explanation, boolean flush) throws IOException {
HttpCommunicationsSession commSession = (HttpCommunicationsSession) peer.getCommunicationsSession();
if(TransferDirection.RECEIVE.equals(direction)){
switch (response) {
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index 8b68c9e..b21f3d5 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -83,11 +83,11 @@ public class SocketClientTransaction extends AbstractTransaction {
}
@Override
- protected void writeTransactionResponse(ResponseCode response, String explanation) throws IOException {
+ protected void writeTransactionResponse(ResponseCode response, String explanation, boolean flush) throws IOException {
if(explanation == null){
- response.writeResponse(dos);
+ response.writeResponse(dos, flush);
} else {
- response.writeResponse(dos, explanation);
+ response.writeResponse(dos, explanation, flush);
}
}
}