You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/07/12 21:23:12 UTC

[incubator-tuweni] branch master updated: On a server ending a scuttlebutt RPC stream, the client should send a final close message to close its end of the stream.

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 91ce130  On a server ending a scuttlebutt RPC stream, the client should send a final close message to close its end of the stream.
     new 9bdf9a7  Merge pull request #25 from Happy0/stream_end
91ce130 is described below

commit 91ce130fb5825f72378f19a490683335b44b533c
Author: Gordon Martin <go...@gmail.com>
AuthorDate: Fri Jul 12 11:26:24 2019 +0100

    On a server ending a scuttlebutt RPC stream, the client should send a final close message to close its end of the stream.
---
 .../org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java   |  7 ++++++-
 .../apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java | 19 +++++++++++--------
 2 files changed, 17 insertions(+), 9 deletions(-)

diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
index 750dfd0..58ffc4c 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
@@ -143,7 +143,12 @@ public final class RPCCodec {
   public static Bytes encodeStreamEndRequest(int requestNumber) throws JsonProcessingException {
     Boolean bool = Boolean.TRUE;
     byte[] bytes = mapper.writeValueAsBytes(bool);
-    return encodeRequest(Bytes.wrap(bytes), requestNumber, RPCFlag.EndOrError.END);
+    return encodeRequest(
+        Bytes.wrap(bytes),
+        requestNumber,
+        RPCFlag.EndOrError.END,
+        RPCFlag.BodyType.JSON,
+        RPCFlag.Stream.STREAM);
   }
 
   /**
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
index 4655862..b0f3639 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
@@ -122,14 +122,7 @@ public class RPCHandler implements Multiplexer, ClientHandler {
       Bytes requestBytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags);
 
       Runnable closeStreamHandler = () -> {
-
-        try {
-          Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
-          sendBytes(streamEnd);
-        } catch (JsonProcessingException e) {
-          logger.warn("Unexpectedly could not encode stream end message to JSON.");
-        }
-
+        endStream(requestNumber);
       };
 
       ScuttlebuttStreamHandler scuttlebuttStreamHandler = responseSink.apply(closeStreamHandler);
@@ -222,6 +215,7 @@ public class RPCHandler implements Multiplexer, ClientHandler {
 
         if (response.isSuccessfulLastMessage()) {
           streams.remove(requestNumber);
+          endStream(requestNumber);
           scuttlebuttStreamHandler.onStreamEnd();
         } else if (exception.isPresent()) {
           scuttlebuttStreamHandler.onStreamError(exception.get());
@@ -266,4 +260,13 @@ public class RPCHandler implements Multiplexer, ClientHandler {
     messageSender.accept(bytes);
   }
 
+  private void endStream(int requestNumber) {
+    try {
+      Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
+      sendBytes(streamEnd);
+    } catch (JsonProcessingException e) {
+      logger.warn("Unexpectedly could not encode stream end message to JSON.");
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org