You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/04/23 19:36:35 UTC

[incubator-tuweni] 31/40: wip

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git

commit fb27edaeeee3eaf999678b3c5202e2bebee4871b
Author: Antoine Toulme <to...@apache.org>
AuthorDate: Mon Apr 22 10:57:47 2019 -0700

    wip
---
 .../handshake/vertx/ClientHandlerFactory.java      |   4 +-
 .../vertx/SecureScuttlebuttVertxClient.java        |  20 +-
 .../vertx/SecureScuttlebuttVertxServer.java        |   2 +-
 .../handshake/vertx/VertxIntegrationTest.java      |   3 +-
 scuttlebutt-rpc/build.gradle                       |   2 +
 .../tuweni/scuttlebutt/rpc/RPCAsyncRequest.java    |  58 +++++
 .../apache/tuweni/scuttlebutt/rpc/RPCCodec.java    |  30 +++
 .../tuweni/scuttlebutt/rpc/RPCErrorBody.java       |  64 ++++++
 .../apache/tuweni/scuttlebutt/rpc/RPCFunction.java |  52 +++++
 .../apache/tuweni/scuttlebutt/rpc/RPCMessage.java  |  44 +++-
 .../tuweni/scuttlebutt/rpc/RPCRequestBody.java     |  71 ++++++
 .../tuweni/scuttlebutt/rpc/RPCRequestType.java     |  35 +++
 .../tuweni/scuttlebutt/rpc/RPCStreamRequest.java   |  57 +++++
 .../tuweni/scuttlebutt/rpc/mux/Multiplexer.java    |  59 +++++
 .../tuweni/scuttlebutt/rpc/mux/RPCHandler.java     | 250 +++++++++++++++++++++
 .../rpc/mux/ScuttlebuttStreamHandler.java          |  41 ++++
 .../mux/exceptions/ConnectionClosedException.java  |  21 ++
 .../scuttlebutt/rpc/PatchworkIntegrationTest.java  |   1 +
 .../tuweni/scuttlebutt/rpc/RPCEncodingTest.java    |   3 +-
 .../rpc/mux/PatchworkIntegrationTest.java          | 249 ++++++++++++++++++++
 20 files changed, 1045 insertions(+), 21 deletions(-)

diff --git a/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/ClientHandlerFactory.java b/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/ClientHandlerFactory.java
index e947f49..f047acd 100644
--- a/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/ClientHandlerFactory.java
+++ b/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/ClientHandlerFactory.java
@@ -19,7 +19,7 @@ import java.util.function.Consumer;
 /**
  * Factory creating stream handlers, managing client-side connections.
  */
-public interface ClientHandlerFactory {
+public interface ClientHandlerFactory<T extends ClientHandler> {
 
   /**
    * Creates a new handler associated with a valid streaming connection.
@@ -27,5 +27,5 @@ public interface ClientHandlerFactory {
    * @param sender the function to send bytes to the server
    * @param terminationFunction a function to terminate the stream properly
    */
-  ClientHandler createHandler(Consumer<Bytes> sender, Runnable terminationFunction);
+  T createHandler(Consumer<Bytes> sender, Runnable terminationFunction);
 }
diff --git a/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java b/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java
index c7f69e5..7ac177c 100644
--- a/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java
+++ b/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java
@@ -39,16 +39,16 @@ import org.logl.LoggerProvider;
  */
 public final class SecureScuttlebuttVertxClient {
 
-  private class NetSocketClientHandler {
+  private class NetSocketClientHandler<T extends ClientHandler> {
 
     private final Logger logger;
     private final NetSocket socket;
     private final SecureScuttlebuttHandshakeClient handshakeClient;
-    private final ClientHandlerFactory handlerFactory;
-    private final CompletableAsyncResult<ClientHandler> completionHandle;
+    private final ClientHandlerFactory<T> handlerFactory;
+    private final CompletableAsyncResult<T> completionHandle;
     private int handshakeCounter;
     private SecureScuttlebuttStreamClient client;
-    private ClientHandler handler;
+    private T handler;
 
     private Bytes messageBuffer = Bytes.EMPTY;
 
@@ -56,8 +56,8 @@ public final class SecureScuttlebuttVertxClient {
         Logger logger,
         NetSocket socket,
         Signature.PublicKey remotePublicKey,
-        ClientHandlerFactory handlerFactory,
-        CompletableAsyncResult<ClientHandler> completionHandle) {
+        ClientHandlerFactory<T> handlerFactory,
+        CompletableAsyncResult<T> completionHandle) {
       this.logger = logger;
       this.socket = socket;
       this.handshakeClient = SecureScuttlebuttHandshakeClient.create(keyPair, networkIdentifier, remotePublicKey);
@@ -182,19 +182,19 @@ public final class SecureScuttlebuttVertxClient {
    * @param handlerFactory the factory of handlers for connections
    * @return a handle to a new stream handler with the remote host
    */
-  public AsyncResult<ClientHandler> connectTo(
+  public <T extends ClientHandler> AsyncResult<T> connectTo(
       int port,
       String host,
       Signature.PublicKey remotePublicKey,
-      ClientHandlerFactory handlerFactory) {
+      ClientHandlerFactory<T> handlerFactory) {
     client = vertx.createNetClient(new NetClientOptions().setTcpKeepAlive(true));
-    CompletableAsyncResult<ClientHandler> completion = AsyncResult.incomplete();
+    CompletableAsyncResult<T> completion = AsyncResult.incomplete();
     client.connect(port, host, res -> {
       if (res.failed()) {
         completion.completeExceptionally(res.cause());
       } else {
         NetSocket socket = res.result();
-        new NetSocketClientHandler(
+        new NetSocketClientHandler<T>(
             loggerProvider.getLogger(host + ":" + port),
             socket,
             remotePublicKey,
diff --git a/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java b/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java
index abd8b82..1e82956 100644
--- a/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java
+++ b/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java
@@ -113,8 +113,8 @@ public final class SecureScuttlebuttVertxServer {
           }
         }
       } catch (HandshakeException | StreamException e) {
-        e.printStackTrace();
         netSocket.close();
+        throw e;
       }
     }
   }
diff --git a/scuttlebutt-handshake/src/test/java/org/apache/tuweni/scuttlebutt/handshake/vertx/VertxIntegrationTest.java b/scuttlebutt-handshake/src/test/java/org/apache/tuweni/scuttlebutt/handshake/vertx/VertxIntegrationTest.java
index 526dba3..6b32dcb 100644
--- a/scuttlebutt-handshake/src/test/java/org/apache/tuweni/scuttlebutt/handshake/vertx/VertxIntegrationTest.java
+++ b/scuttlebutt-handshake/src/test/java/org/apache/tuweni/scuttlebutt/handshake/vertx/VertxIntegrationTest.java
@@ -120,8 +120,7 @@ class VertxIntegrationTest {
 
     SecureScuttlebuttVertxClient client =
         new SecureScuttlebuttVertxClient(provider, vertx, Signature.KeyPair.random(), networkIdentifier);
-    MyClientHandler handler =
-        (MyClientHandler) client.connectTo(20000, "0.0.0.0", serverKeyPair.publicKey(), MyClientHandler::new).get();
+    MyClientHandler handler = client.connectTo(20000, "0.0.0.0", serverKeyPair.publicKey(), MyClientHandler::new).get();
 
     Thread.sleep(1000);
     assertNotNull(handler);
diff --git a/scuttlebutt-rpc/build.gradle b/scuttlebutt-rpc/build.gradle
index b6f1339..cf71a72 100644
--- a/scuttlebutt-rpc/build.gradle
+++ b/scuttlebutt-rpc/build.gradle
@@ -5,6 +5,8 @@ dependencies {
   compile project(':concurrent')
   compile project(':crypto')
   compile project(':scuttlebutt')
+  compile project(':scuttlebutt-handshake')
+  compile 'org.logl:logl-api'
 
   compile 'com.fasterxml.jackson.core:jackson-databind'
 
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java
new file mode 100644
index 0000000..f4281d7
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc;
+
+import net.consensys.cava.bytes.Bytes;
+
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class RPCAsyncRequest {
+
+
+  private final RPCFunction function;
+  private final List<Object> arguments;
+
+  /**
+   *
+   * @param function the function to be in invoked. If the function is in a namespace, the first n-1 items in the array
+   *        are the namespace followed by the function name (e.g. 'blobs.get' becomes ['blobs', 'get']).
+   * @param arguments The arguments passed to the function being invoked. Each item can be any arbitrary object which is
+   *        JSON serializable (e.g. String, Int, list, object.)
+   *
+   */
+  public RPCAsyncRequest(RPCFunction function, List<Object> arguments) {
+    this.function = function;
+    this.arguments = arguments;
+  }
+
+  /**
+   * Encode the RPC request as bytes.
+   *
+   * @param objectMapper the object mapper to serialize the request with
+   * @return an RPC request serialized into bytes
+   * @throws JsonProcessingException thrown if there is an error while serializing the request to bytes
+   */
+  public Bytes toEncodedRpcMessage(ObjectMapper objectMapper) throws JsonProcessingException {
+    return RPCCodec.encodeRequest(
+        new RPCRequestBody(function.asList(), RPCRequestType.ASYNC, arguments).asBytes(objectMapper),
+        getRPCFlags());
+  }
+
+  public RPCFlag[] getRPCFlags() {
+    return new RPCFlag[] {RPCFlag.BodyType.JSON};
+  }
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
index ca7223d..750dfd0 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
@@ -18,6 +18,9 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 /**
  * Encoder responsible for encoding requests.
  * <p>
@@ -27,6 +30,8 @@ public final class RPCCodec {
 
   static final AtomicInteger counter = new AtomicInteger(1);
 
+  private static ObjectMapper mapper = new ObjectMapper();
+
   private static int nextRequestNumber() {
     int requestNumber = counter.getAndIncrement();
     if (requestNumber < 1) {
@@ -82,6 +87,19 @@ public final class RPCCodec {
   }
 
   /**
+   * Encode a message as an RPC request.
+   *
+   * @param body the body to encode as an RPC request
+   * @param requestNumber the request number
+   * @param flags the flags of the RPC request (already encoded.)
+   * @return the message encoded as an RPC request
+   */
+  public static Bytes encodeRequest(Bytes body, int requestNumber, byte flags) {
+    return Bytes
+        .concatenate(Bytes.of(flags), Bytes.ofUnsignedInt(body.size()), Bytes.ofUnsignedInt(requestNumber), body);
+  }
+
+  /**
    * Encode a message as a response to a RPC request.
    * 
    * @param body the body to encode as the body of the response
@@ -117,6 +135,18 @@ public final class RPCCodec {
   }
 
   /**
+   * Encodes a message with the body and headers set in the appropriate way to end a stream.
+   *
+   * @return the response encoded as an RPC request
+   * @throws JsonProcessingException
+   */
+  public static Bytes encodeStreamEndRequest(int requestNumber) throws JsonProcessingException {
+    Boolean bool = Boolean.TRUE;
+    byte[] bytes = mapper.writeValueAsBytes(bool);
+    return encodeRequest(Bytes.wrap(bytes), requestNumber, RPCFlag.EndOrError.END);
+  }
+
+  /**
    * Encode a message as a response to a RPC request.
    * 
    * @param body the body to encode as the body of the response
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java
new file mode 100644
index 0000000..87fa5b6
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc;
+
+/**
+ * An RPC message response body which contains an error
+ */
+public class RPCErrorBody {
+
+  private String name;
+  private String message;
+  private String stack;
+
+  public RPCErrorBody() {
+
+  }
+
+  /**
+   * A description of an error that occurred while performing an RPC request.
+   *
+   * @param name the name of the error type
+   * @param message the message describing the error
+   * @param stack the stack trace from the error
+   */
+  public RPCErrorBody(String name, String message, String stack) {
+    this.name = name;
+    this.message = message;
+    this.stack = stack;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+  public String getStack() {
+    return stack;
+  }
+
+  public void setStack(String stack) {
+    this.stack = stack;
+  }
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java
new file mode 100644
index 0000000..511a491
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A scuttlebutt RPC function namespace and name representation.
+ */
+public class RPCFunction {
+
+  private final List<String> namespace;
+  private final String functionName;
+
+  /**
+   *
+   * @param namespace the namespace of the function (e.g. ['blobs']. May be empty if there is no namespace for the
+   *        function.
+   * @param functionName the function (e.g. 'add'.)
+   */
+  public RPCFunction(List<String> namespace, String functionName) {
+    this.namespace = namespace;
+    this.functionName = functionName;
+  }
+
+  public RPCFunction(String functionName) {
+    this.namespace = new ArrayList<>();
+    this.functionName = functionName;
+  }
+
+  /**
+   * @return The list representation of the namespace and function call.
+   */
+  public List<String> asList() {
+    List<String> list = new ArrayList<>();
+    list.addAll(namespace);
+    list.add(functionName);
+    return list;
+  }
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java
index a82f1fc..2f21645 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java
@@ -19,14 +19,13 @@ import org.apache.tuweni.bytes.Bytes;
 import java.io.IOException;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
 
 /**
  * Decoded RPC message, making elements of the message available directly.
  */
 public final class RPCMessage {
 
-  private static final ObjectMapper mapper = new ObjectMapper();
-
   private final byte rpcFlags;
   private final boolean stream;
   private final boolean lastMessageOrError;
@@ -82,6 +81,40 @@ public final class RPCMessage {
   }
 
   /**
+   *
+   * @return true if this is a last message in a stream, and it is not an error
+   */
+  public boolean isSuccessfulLastMessage() {
+    return lastMessageOrError() && asString().equals("true");
+  }
+
+  /**
+   *
+   * @return true if this is an error message response
+   */
+  public boolean isErrorMessage() {
+    return lastMessageOrError && !isSuccessfulLastMessage();
+  }
+
+  /**
+   * @param objectMapper the object mapper to deserialize with
+   * @return the RPC error response body, if this is an error response - nothing otherwise
+   */
+  public Optional<RPCErrorBody> getErrorBody(ObjectMapper objectMapper) {
+
+    if (!isErrorMessage()) {
+      // If the body of the response is 'true' or the error flag isn't set, it's a successful end condition
+      return Optional.absent();
+    } else {
+      try {
+        return Optional.of(asJSON(objectMapper, RPCErrorBody.class));
+      } catch (IOException e) {
+        return Optional.absent();
+      }
+    }
+  }
+
+  /**
    * Provides the type of the body of the message: a binary message, a UTF-8 string or a JSON message.
    * 
    * @return the type of the body: a binary message, a UTF-8 string or a JSON message
@@ -128,13 +161,14 @@ public final class RPCMessage {
 
   /**
    * Provides the body of the message, marshalled as a JSON object.
-   * 
+   *
+   * @param objectMapper the object mapper to deserialize with
    * @param clazz the JSON object class
    * @param <T> the matching JSON object class
    * @return a new instance of the JSON object class
    * @throws IOException if an error occurs during marshalling
    */
-  public <T> T asJSON(Class<T> clazz) throws IOException {
-    return mapper.readerFor(clazz).readValue(body().toArrayUnsafe());
+  public <T> T asJSON(ObjectMapper objectMapper, Class<T> clazz) throws IOException {
+    return objectMapper.readerFor(clazz).readValue(body().toArrayUnsafe());
   }
 }
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java
new file mode 100644
index 0000000..70ad4e3
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc;
+
+import net.consensys.cava.bytes.Bytes;
+
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * The request payload of an RPC request to another node. The fields are as specified in the scuttlebutt protocol docs
+ */
+public class RPCRequestBody {
+
+  private final List<String> name;
+  private final RPCRequestType type;
+  private final List<Object> args;
+
+  /**
+   *
+   * @param name the function to be in invoked. If the function is in a namespace, the first n-1 items in the array are
+   *        the namespace followed by the function name (e.g. 'blobs.get' becomes ['blobs', 'get']). If the function is
+   *        not in a namespace, it is an array with one item (e.g. ['createFeedStream'].
+   * @param type the type of the request (e.g. stream or async.)
+   * @param args The args passed to the function being invoked. Each item can be any arbitrary object which is JSON
+   *        serializable (e.g. String, Int, list, object.)
+   */
+  public RPCRequestBody(List<String> name, RPCRequestType type, List<Object> args) {
+    this.name = name;
+    this.type = type;
+    this.args = args;
+  }
+
+  public List<String> getName() {
+    return name;
+  }
+
+  public RPCRequestType getType() {
+    return type;
+  }
+
+  public List<Object> getArgs() {
+    return args;
+  }
+
+  /**
+   *
+   * @param objectMapper the object mapper to serialize to bytes with
+   * @return the bytes representation of this RPC request body. The request is first encoded into JSON, then from JSON
+   *         to a byte array
+   * @throws JsonProcessingException thrown if there is a problem transforming the object to JSON.
+   */
+  public Bytes asBytes(ObjectMapper objectMapper) throws JsonProcessingException {
+    byte[] bytes = objectMapper.writeValueAsBytes(this);
+    return Bytes.wrap(bytes);
+  }
+
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java
new file mode 100644
index 0000000..9703da4
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The available type of Scuttlebutt RPC requests
+ */
+public enum RPCRequestType {
+
+  /**
+   * An 'async' request, which returns one result some time in the future.
+   */
+  @JsonProperty("async")
+  ASYNC,
+
+  /**
+   * A 'source' type request, which begins a stream of results
+   */
+  @JsonProperty("source")
+  SOURCE
+
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java
new file mode 100644
index 0000000..ff198da
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc;
+
+import net.consensys.cava.bytes.Bytes;
+
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * A request which returns a 'source' type result (e.g. opens up a stream that is followed by the request ID.)
+ */
+public class RPCStreamRequest {
+
+  private final RPCFunction function;
+  private final List<Object> arguments;
+
+  /**
+   * The details for the function (the name of the function and its arguments.)
+   * 
+   * @param function the function to be invoked
+   * @param arguments the arguments for the function (can be any arbitrary class which can be marshalled into JSON.)
+   */
+  public RPCStreamRequest(RPCFunction function, List<Object> arguments) {
+    this.function = function;
+    this.arguments = arguments;
+  }
+
+  /**
+   * @return The byte representation for the request after it is marshalled into a JSON string.
+   * @throws JsonProcessingException if an error was thrown while marshalling to JSON
+   */
+  public Bytes toEncodedRpcMessage(ObjectMapper mapper) throws JsonProcessingException {
+    RPCRequestBody body = new RPCRequestBody(function.asList(), RPCRequestType.SOURCE, arguments);
+    return RPCCodec.encodeRequest(body.asBytes(mapper), getRPCFlags());
+  }
+
+  /**
+   * @return The correct RPC flags for a stream request
+   */
+  public RPCFlag[] getRPCFlags() {
+    return new RPCFlag[] {RPCFlag.Stream.STREAM, RPCFlag.BodyType.JSON};
+  }
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java
new file mode 100644
index 0000000..722dcbf
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc.mux;
+
+import net.consensys.cava.concurrent.AsyncResult;
+import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
+import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
+import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
+import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+
+import java.util.function.Function;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+/**
+ * Multiplexes asynchronous requests and streams across a connection to a node. Handles multiple active requests and
+ * streams across one connection.
+ */
+public interface Multiplexer {
+
+  /**
+   * Issue an 'async' type request to a node, which will eventually return a result from the node.
+   *
+   * @param request the request details
+   *
+   * @return an async result which will be completed with the result or an error if the request fails.
+   */
+  AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request);
+
+  /**
+   * Creates a request which opens a stream (e.g. a 'source' in the protocol docs.)
+   *
+   * @param request the request details
+   * @param streamFactory a function which takes a 'Runnable' which closes the stream when ran, and returns a stream
+   *        handler to pass messages to
+   *
+   * @throws JsonProcessingException
+   */
+  void openStream(RPCStreamRequest request, Function<Runnable, ScuttlebuttStreamHandler> streamFactory)
+      throws JsonProcessingException,
+      ConnectionClosedException;
+
+
+  /**
+   * Close the underlying connection
+   */
+  void close();
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
new file mode 100644
index 0000000..0ac4583
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc.mux;
+
+import net.consensys.cava.bytes.Bytes;
+import net.consensys.cava.concurrent.AsyncResult;
+import net.consensys.cava.concurrent.CompletableAsyncResult;
+import net.consensys.cava.scuttlebutt.handshake.vertx.ClientHandler;
+import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
+import net.consensys.cava.scuttlebutt.rpc.RPCCodec;
+import net.consensys.cava.scuttlebutt.rpc.RPCErrorBody;
+import net.consensys.cava.scuttlebutt.rpc.RPCFlag;
+import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
+import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
+import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import org.logl.Logger;
+import org.logl.LoggerProvider;
+
+/**
+ * Handles RPC requests and responses from an active connection to a scuttlebutt node
+ *
+ * Note: the public methods on this class are synchronized so that a request is rejected if the connection has been
+ * closed before it begins and any 'in flight' requests are ended exceptionally with a 'connection closed' error without
+ * new incoming requests being added to the maps by threads.
+ *
+ * In the future,we could perhaps be carefully more fine grained about the locking if we require a high degree of
+ * concurrency.
+ *
+ */
+public class RPCHandler implements Multiplexer, ClientHandler {
+
+  private final Consumer<Bytes> messageSender;
+  private final Logger logger;
+  private final Runnable connectionCloser;
+  private final ObjectMapper objectMapper;
+
+  private Map<Integer, CompletableAsyncResult<RPCMessage>> awaitingAsyncResponse = new HashMap<>();
+  private Map<Integer, ScuttlebuttStreamHandler> streams = new HashMap<>();
+
+  private boolean closed;
+
+  /**
+   * Makes RPC requests over a connection
+   *
+   * @param messageSender sends the request to the node
+   * @param terminationFn closes the connection
+   * @param objectMapper the objectMapper to serialize and deserialize message request and response bodies
+   * @param logger
+   */
+  public RPCHandler(
+      Consumer<Bytes> messageSender,
+      Runnable terminationFn,
+      ObjectMapper objectMapper,
+      LoggerProvider logger) {
+    this.messageSender = messageSender;
+    this.connectionCloser = terminationFn;
+    this.closed = false;
+    this.objectMapper = objectMapper;
+
+    this.logger = logger.getLogger("rpc handler");
+  }
+
+  @Override
+  public synchronized AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request) {
+
+    CompletableAsyncResult<RPCMessage> result = AsyncResult.incomplete();
+
+    if (closed) {
+      result.completeExceptionally(new ConnectionClosedException());
+    }
+
+    try {
+      RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper));
+      int requestNumber = message.requestNumber();
+      awaitingAsyncResponse.put(requestNumber, result);
+      Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags());
+      messageSender.accept(bytes);
+
+    } catch (JsonProcessingException e) {
+      result.completeExceptionally(e);
+    }
+
+    return result;
+  }
+
+  @Override
+  public synchronized void openStream(
+      RPCStreamRequest request,
+      Function<Runnable, ScuttlebuttStreamHandler> responseSink) throws JsonProcessingException,
+      ConnectionClosedException {
+
+    if (closed) {
+      throw new ConnectionClosedException();
+    }
+
+    try {
+      RPCFlag[] rpcFlags = request.getRPCFlags();
+      RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper));
+      int requestNumber = message.requestNumber();
+
+      Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags);
+      messageSender.accept(bytes);
+
+      Runnable closeStreamHandler = new Runnable() {
+        @Override
+        public void run() {
+
+          try {
+            Bytes bytes = RPCCodec.encodeStreamEndRequest(requestNumber);
+            messageSender.accept(bytes);
+          } catch (JsonProcessingException e) {
+            logger.warn("Unexpectedly could not encode stream end message to JSON.");
+          }
+
+        }
+      };
+
+      ScuttlebuttStreamHandler scuttlebuttStreamHandler = responseSink.apply(closeStreamHandler);
+
+      streams.put(requestNumber, scuttlebuttStreamHandler);
+    } catch (JsonProcessingException ex) {
+      throw ex;
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    connectionCloser.run();
+  }
+
+  @Override
+  public synchronized void receivedMessage(Bytes message) {
+
+    RPCMessage rpcMessage = new RPCMessage(message);
+
+    // A negative request number indicates that this is a response, rather than a request that this node
+    // should service
+    if (rpcMessage.requestNumber() < 0) {
+      handleResponse(rpcMessage);
+    } else {
+      handleRequest(rpcMessage);
+    }
+
+  }
+
+  private void handleRequest(RPCMessage rpcMessage) {
+    // Not yet implemented
+    logger.warn("Received incoming request, but we do not yet handle any requests: " + rpcMessage.asString());
+
+  }
+
+  private void handleResponse(RPCMessage response) {
+    int requestNumber = response.requestNumber() * -1;
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Incoming response: " + response.asString());
+    }
+
+    byte rpcFlags = response.rpcFlags();
+
+    boolean isStream = RPCFlag.Stream.STREAM.isApplied(rpcFlags);
+
+    if (isStream) {
+      ScuttlebuttStreamHandler scuttlebuttStreamHandler = streams.get(requestNumber);
+
+      if (scuttlebuttStreamHandler != null) {
+
+        if (response.isSuccessfulLastMessage()) {
+          streams.remove(requestNumber);
+          scuttlebuttStreamHandler.onStreamEnd();
+        } else if (response.isErrorMessage()) {
+
+          Optional<RPCErrorBody> errorBody = response.getErrorBody(objectMapper);
+
+          if (errorBody.isPresent()) {
+            scuttlebuttStreamHandler.onStreamError(new Exception(errorBody.get().getMessage()));
+          } else {
+            // This shouldn't happen, but for safety we fall back to just writing the whole body in the exception message
+            // if we fail to marshall it for whatever reason
+            scuttlebuttStreamHandler.onStreamError(new Exception(response.asString()));
+          }
+
+        } else {
+          scuttlebuttStreamHandler.onMessage(response);
+        }
+      } else {
+        logger.warn(
+            "Couldn't find stream handler for RPC response with request number "
+                + requestNumber
+                + " "
+                + response.asString());
+      }
+
+    } else {
+
+      CompletableAsyncResult<RPCMessage> rpcMessageFuture = awaitingAsyncResponse.get(requestNumber);
+
+      if (rpcMessageFuture != null) {
+        rpcMessageFuture.complete(response);
+        awaitingAsyncResponse.remove(requestNumber);
+      } else {
+        logger.warn(
+            "Couldn't find async handler for RPC response with request number "
+                + requestNumber
+                + " "
+                + response.asString());
+      }
+    }
+
+  }
+
+  @Override
+  public void streamClosed() {
+    this.closed = true;
+
+    streams.forEach((key, streamHandler) -> {
+      streamHandler.onStreamError(new ConnectionClosedException());
+    });
+
+    streams.clear();
+
+    awaitingAsyncResponse.forEach((key, value) -> {
+      if (!value.isDone()) {
+        value.completeExceptionally(new ConnectionClosedException());
+      }
+
+    });
+
+
+  }
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java
new file mode 100644
index 0000000..d108663
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc.mux;
+
+import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
+
+/**
+ * Handles incoming items from a result stream
+ */
+public interface ScuttlebuttStreamHandler {
+
+  /**
+   * Handles a new message from the result stream.
+   *
+   * @param message
+   */
+  void onMessage(RPCMessage message);
+
+  /**
+   * Invoked when the stream has been closed.
+   */
+  void onStreamEnd();
+
+  /**
+   * Invoked when there is an error in the stream.
+   *
+   * @param ex the underlying error
+   */
+  void onStreamError(Exception ex);
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java
new file mode 100644
index 0000000..160946c
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc.mux.exceptions;
+
+public class ConnectionClosedException extends Exception {
+
+  public ConnectionClosedException() {
+    super("Connection is closed.");
+  }
+
+}
diff --git a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java
index 63b64a8..3178fae 100644
--- a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java
+++ b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java
@@ -188,4 +188,5 @@ class PatchworkIntegrationTest {
 
     secureScuttlebuttVertxClient.stop().join();
   }
+
 }
diff --git a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/RPCEncodingTest.java b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/RPCEncodingTest.java
index 10966cf..912c1b5 100644
--- a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/RPCEncodingTest.java
+++ b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/RPCEncodingTest.java
@@ -21,6 +21,7 @@ import org.apache.tuweni.bytes.Bytes;
 
 import java.nio.charset.StandardCharsets;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.junit.jupiter.api.Test;
 
 class RPCEncodingTest {
@@ -44,7 +45,7 @@ class RPCEncodingTest {
         RPCFlag.Stream.STREAM);
     RPCMessage decoded = new RPCMessage(message);
     assertTrue(decoded.stream());
-    assertEquals("some JSON string", decoded.asJSON(String.class));
+    assertEquals("some JSON string", decoded.asJSON(new ObjectMapper(), String.class));
     assertEquals(RPCFlag.BodyType.JSON, decoded.bodyType());
     assertEquals(RPCCodec.counter.get() - 1, decoded.requestNumber());
   }
diff --git a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java
new file mode 100644
index 0000000..293ac92
--- /dev/null
+++ b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java
@@ -0,0 +1,249 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc.mux;
+
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import net.consensys.cava.bytes.Bytes;
+import net.consensys.cava.bytes.Bytes32;
+import net.consensys.cava.concurrent.AsyncResult;
+import net.consensys.cava.concurrent.CompletableAsyncResult;
+import net.consensys.cava.crypto.sodium.Signature;
+import net.consensys.cava.io.Base64;
+import net.consensys.cava.junit.VertxExtension;
+import net.consensys.cava.junit.VertxInstance;
+import net.consensys.cava.scuttlebutt.handshake.vertx.SecureScuttlebuttVertxClient;
+import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
+import net.consensys.cava.scuttlebutt.rpc.RPCFunction;
+import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
+import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
+import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import io.vertx.core.Vertx;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.logl.Level;
+import org.logl.LoggerProvider;
+import org.logl.logl.SimpleLogger;
+import org.logl.vertx.LoglLogDelegateFactory;
+
+@ExtendWith(VertxExtension.class)
+public class PatchworkIntegrationTest {
+
+  LoggerProvider loggerProvider = SimpleLogger.withLogLevel(Level.DEBUG).toPrintWriter(
+      new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.out, UTF_8))));
+
+  @Test
+  @Disabled
+  public void testWithPatchwork(@VertxInstance Vertx vertx) throws Exception {
+
+    RPCHandler rpcHandler = makeRPCHandler(vertx);
+
+    List<AsyncResult<RPCMessage>> results = new ArrayList<>();
+
+    for (int i = 0; i < 10; i++) {
+      RPCFunction function = new RPCFunction("whoami");
+      RPCAsyncRequest asyncRequest = new RPCAsyncRequest(function, new ArrayList<>());
+
+      AsyncResult<RPCMessage> res = rpcHandler.makeAsyncRequest(asyncRequest);
+
+      results.add(res);
+    }
+
+    AsyncResult<List<RPCMessage>> allResults = AsyncResult.combine(results);
+    List<RPCMessage> rpcMessages = allResults.get();
+
+    assertEquals(10, rpcMessages.size());
+
+    rpcMessages.forEach(msg -> {
+      assertFalse(msg.lastMessageOrError());
+
+    });
+
+  }
+
+
+  // TODO: Move this to a utility class that all the scuttlebutt modules' tests can use.
+  private Signature.KeyPair getLocalKeys() throws Exception {
+    Optional<String> ssbDir = Optional.fromNullable(System.getenv().get("ssb_dir"));
+    Optional<String> homePath =
+        Optional.fromNullable(System.getProperty("user.home")).transform(home -> home + "/.ssb");
+
+    Optional<String> path = ssbDir.or(homePath);
+
+    if (!path.isPresent()) {
+      throw new Exception("Cannot find ssb directory config value");
+    }
+
+    String secretPath = path.get() + "/secret";
+    File file = new File(secretPath);
+
+    if (!file.exists()) {
+      throw new Exception("Secret file does not exist");
+    }
+
+    Scanner s = new Scanner(file, UTF_8.name());
+    s.useDelimiter("\n");
+
+    ArrayList<String> list = new ArrayList<String>();
+    while (s.hasNext()) {
+      String next = s.next();
+
+      // Filter out the comment lines
+      if (!next.startsWith("#")) {
+        list.add(next);
+      }
+    }
+
+    String secretJSON = String.join("", list);
+
+    ObjectMapper mapper = new ObjectMapper();
+
+    HashMap<String, String> values = mapper.readValue(secretJSON, new TypeReference<Map<String, String>>() {});
+    String pubKey = values.get("public").replace(".ed25519", "");
+    String privateKey = values.get("private").replace(".ed25519", "");
+
+    Bytes pubKeyBytes = Base64.decode(pubKey);
+    Bytes privKeyBytes = Base64.decode(privateKey);
+
+    Signature.PublicKey pub = Signature.PublicKey.fromBytes(pubKeyBytes);
+    Signature.SecretKey secretKey = Signature.SecretKey.fromBytes(privKeyBytes);
+
+    return new Signature.KeyPair(pub, secretKey);
+  }
+
+  @Test
+  @Disabled
+  public void postMessageTest(@VertxInstance Vertx vertx) throws Exception {
+
+    RPCHandler rpcHandler = makeRPCHandler(vertx);
+
+
+    List<AsyncResult<RPCMessage>> results = new ArrayList<>();
+
+    for (int i = 0; i < 20; i++) {
+      // Note: in a real use case, this would more likely be a Java class with these fields
+      HashMap<String, String> params = new HashMap<>();
+      params.put("type", "post");
+      params.put("text", "test test " + i);
+
+      RPCAsyncRequest asyncRequest = new RPCAsyncRequest(new RPCFunction("publish"), Arrays.asList(params));
+
+      AsyncResult<RPCMessage> rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest);
+
+      results.add(rpcMessageAsyncResult);
+
+    }
+
+    List<RPCMessage> rpcMessages = AsyncResult.combine(results).get();
+
+    rpcMessages.forEach(msg -> System.out.println(msg.asString()));
+  }
+
+  private RPCHandler makeRPCHandler(Vertx vertx) throws Exception {
+    Signature.KeyPair keyPair = getLocalKeys();
+    String networkKeyBase64 = "1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s=";
+    Bytes32 networkKeyBytes32 = Bytes32.wrap(Base64.decode(networkKeyBase64));
+
+    String host = "localhost";
+    int port = 8008;
+    LoggerProvider loggerProvider = SimpleLogger.withLogLevel(Level.DEBUG).toPrintWriter(
+        new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.out, UTF_8))));
+    LoglLogDelegateFactory.setProvider(loggerProvider);
+
+    SecureScuttlebuttVertxClient secureScuttlebuttVertxClient =
+        new SecureScuttlebuttVertxClient(loggerProvider, vertx, keyPair, networkKeyBytes32);
+
+    AsyncResult<RPCHandler> onConnect =
+        secureScuttlebuttVertxClient.connectTo(port, host, keyPair.publicKey(), (sender, terminationFn) -> {
+
+          return new RPCHandler(sender, terminationFn, new ObjectMapper(), loggerProvider);
+        });
+
+    return onConnect.get();
+  }
+
+
+  @Test
+  @Disabled
+  public void streamTest(@VertxInstance Vertx vertx) throws Exception {
+
+    RPCHandler handler = makeRPCHandler(vertx);
+    Signature.PublicKey publicKey = getLocalKeys().publicKey();
+
+    String pubKey = "@" + Base64.encode(publicKey.bytes()) + ".ed25519";
+
+    Map<String, String> params = new HashMap<>();
+    params.put("id", pubKey);
+
+    CompletableAsyncResult<Void> streamEnded = AsyncResult.incomplete();
+
+    RPCStreamRequest streamRequest = new RPCStreamRequest(new RPCFunction("createUserStream"), Arrays.asList(params));
+
+    try {
+      handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() {
+        @Override
+        public void onMessage(RPCMessage message) {
+          System.out.print(message.asString());
+        }
+
+        @Override
+        public void onStreamEnd() {
+          streamEnded.complete(null);
+        }
+
+        @Override
+        public void onStreamError(Exception ex) {
+
+        }
+      });
+    } catch (ConnectionClosedException e) {
+      throw e;
+    }
+
+    // Wait until the stream is complete
+    streamEnded.get();
+
+  }
+
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org