You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/07/12 21:23:12 UTC
[incubator-tuweni] branch master updated: On a server ending a
scuttlebutt RPC stream,
the client should send a final close message to close its end of the stream.
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 91ce130 On a server ending a scuttlebutt RPC stream, the client should send a final close message to close its end of the stream.
new 9bdf9a7 Merge pull request #25 from Happy0/stream_end
91ce130 is described below
commit 91ce130fb5825f72378f19a490683335b44b533c
Author: Gordon Martin <go...@gmail.com>
AuthorDate: Fri Jul 12 11:26:24 2019 +0100
On a server ending a scuttlebutt RPC stream, the client should send a final close message to close its end of the stream.
---
.../org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java | 7 ++++++-
.../apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java | 19 +++++++++++--------
2 files changed, 17 insertions(+), 9 deletions(-)
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
index 750dfd0..58ffc4c 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
@@ -143,7 +143,12 @@ public final class RPCCodec {
public static Bytes encodeStreamEndRequest(int requestNumber) throws JsonProcessingException {
Boolean bool = Boolean.TRUE;
byte[] bytes = mapper.writeValueAsBytes(bool);
- return encodeRequest(Bytes.wrap(bytes), requestNumber, RPCFlag.EndOrError.END);
+ return encodeRequest(
+ Bytes.wrap(bytes),
+ requestNumber,
+ RPCFlag.EndOrError.END,
+ RPCFlag.BodyType.JSON,
+ RPCFlag.Stream.STREAM);
}
/**
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
index 4655862..b0f3639 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
@@ -122,14 +122,7 @@ public class RPCHandler implements Multiplexer, ClientHandler {
Bytes requestBytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags);
Runnable closeStreamHandler = () -> {
-
- try {
- Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
- sendBytes(streamEnd);
- } catch (JsonProcessingException e) {
- logger.warn("Unexpectedly could not encode stream end message to JSON.");
- }
-
+ endStream(requestNumber);
};
ScuttlebuttStreamHandler scuttlebuttStreamHandler = responseSink.apply(closeStreamHandler);
@@ -222,6 +215,7 @@ public class RPCHandler implements Multiplexer, ClientHandler {
if (response.isSuccessfulLastMessage()) {
streams.remove(requestNumber);
+ endStream(requestNumber);
scuttlebuttStreamHandler.onStreamEnd();
} else if (exception.isPresent()) {
scuttlebuttStreamHandler.onStreamError(exception.get());
@@ -266,4 +260,13 @@ public class RPCHandler implements Multiplexer, ClientHandler {
messageSender.accept(bytes);
}
+ private void endStream(int requestNumber) {
+ try {
+ Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
+ sendBytes(streamEnd);
+ } catch (JsonProcessingException e) {
+ logger.warn("Unexpectedly could not encode stream end message to JSON.");
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org