You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/04/23 23:09:13 UTC

[incubator-tuweni] 32/43: Bring over latest changes on scuttlebutt

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git

commit ec647af92a1ba8537b2723e7f3d0b690eb1dc2d3
Author: Antoine Toulme <to...@apache.org>
AuthorDate: Mon Apr 22 14:11:23 2019 -0700

    Bring over latest changes on scuttlebutt
---
 scuttlebutt-rpc/build.gradle                       |   3 +-
 .../tuweni/scuttlebutt/rpc/RPCAsyncRequest.java    |  12 +-
 .../tuweni/scuttlebutt/rpc/RPCErrorBody.java       |  10 +-
 .../apache/tuweni/scuttlebutt/rpc/RPCFunction.java |  10 +-
 .../apache/tuweni/scuttlebutt/rpc/RPCMessage.java  |  31 ++-
 .../tuweni/scuttlebutt/rpc/RPCRequestBody.java     |  12 +-
 .../tuweni/scuttlebutt/rpc/RPCRequestType.java     |  10 +-
 .../apache/tuweni/scuttlebutt/rpc/RPCResponse.java |  80 +++++++
 .../tuweni/scuttlebutt/rpc/RPCStreamRequest.java   |  12 +-
 .../tuweni/scuttlebutt/rpc/mux/Multiplexer.java    |  22 +-
 .../tuweni/scuttlebutt/rpc/mux/RPCHandler.java     | 243 +++++++++++----------
 .../rpc/mux/ScuttlebuttStreamHandler.java          |  14 +-
 .../mux/exceptions/ConnectionClosedException.java  |  10 +-
 .../mux/exceptions/RPCRequestFailedException.java  |  20 ++
 .../scuttlebutt/rpc/PatchworkIntegrationTest.java  |   8 -
 .../rpc/mux/PatchworkIntegrationTest.java          |  99 ++++-----
 16 files changed, 361 insertions(+), 235 deletions(-)

diff --git a/scuttlebutt-rpc/build.gradle b/scuttlebutt-rpc/build.gradle
index cf71a72..1c04ff4 100644
--- a/scuttlebutt-rpc/build.gradle
+++ b/scuttlebutt-rpc/build.gradle
@@ -1,9 +1,10 @@
-description = 'Scuttlebutt Handshake library'
+description = 'Scuttlebutt RPC library'
 
 dependencies {
   compile project(':bytes')
   compile project(':concurrent')
   compile project(':crypto')
+  compileOnly 'io.vertx:vertx-core'
   compile project(':scuttlebutt')
   compile project(':scuttlebutt-handshake')
   compile 'org.logl:logl-api'
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java
index f4281d7..b94c224 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java
@@ -1,8 +1,8 @@
 /*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -10,9 +10,9 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package net.consensys.cava.scuttlebutt.rpc;
+package org.apache.tuweni.scuttlebutt.rpc;
 
-import net.consensys.cava.bytes.Bytes;
+import org.apache.tuweni.bytes.Bytes;
 
 import java.util.List;
 
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java
index 87fa5b6..bd1e888 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java
@@ -1,8 +1,8 @@
 /*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -10,7 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package net.consensys.cava.scuttlebutt.rpc;
+package org.apache.tuweni.scuttlebutt.rpc;
 
 /**
  * An RPC message response body which contains an error
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java
index 511a491..cef5071 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java
@@ -1,8 +1,8 @@
 /*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -10,7 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package net.consensys.cava.scuttlebutt.rpc;
+package org.apache.tuweni.scuttlebutt.rpc;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java
index 2f21645..7dade5d 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java
@@ -15,11 +15,12 @@ package org.apache.tuweni.scuttlebutt.rpc;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
 
 /**
  * Decoded RPC message, making elements of the message available directly.
@@ -104,17 +105,41 @@ public final class RPCMessage {
 
     if (!isErrorMessage()) {
       // If the body of the response is 'true' or the error flag isn't set, it's a successful end condition
-      return Optional.absent();
+      return Optional.empty();
     } else {
       try {
         return Optional.of(asJSON(objectMapper, RPCErrorBody.class));
       } catch (IOException e) {
-        return Optional.absent();
+        return Optional.empty();
       }
     }
   }
 
   /**
+   *
+   * @param objectMapper the objectmatter to deserialize the error with.
+   *
+   * @return an exception if this represents an error RPC response, otherwise nothing
+   */
+  public Optional<RPCRequestFailedException> getException(ObjectMapper objectMapper) {
+    if (isErrorMessage()) {
+      Optional<RPCRequestFailedException> exception =
+          getErrorBody(objectMapper).map(errorBody -> new RPCRequestFailedException(errorBody.getMessage()));
+
+      if (!exception.isPresent()) {
+        // If we failed to deserialize into the RPCErrorBody type there may be a bug in the server implementation
+        // which prevented it returning the correct type, so we just print whatever it returned
+        return Optional.of(new RPCRequestFailedException(this.asString()));
+      } else {
+        return exception;
+      }
+
+    } else {
+      return Optional.empty();
+    }
+  }
+
+  /**
    * Provides the type of the body of the message: a binary message, a UTF-8 string or a JSON message.
    * 
    * @return the type of the body: a binary message, a UTF-8 string or a JSON message
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java
index 70ad4e3..471d5b9 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java
@@ -1,8 +1,8 @@
 /*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -10,9 +10,9 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package net.consensys.cava.scuttlebutt.rpc;
+package org.apache.tuweni.scuttlebutt.rpc;
 
-import net.consensys.cava.bytes.Bytes;
+import org.apache.tuweni.bytes.Bytes;
 
 import java.util.List;
 
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java
index 9703da4..85a18b2 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java
@@ -1,8 +1,8 @@
 /*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -10,7 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package net.consensys.cava.scuttlebutt.rpc;
+package org.apache.tuweni.scuttlebutt.rpc;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCResponse.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCResponse.java
new file mode 100644
index 0000000..73c6e86
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCResponse.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.scuttlebutt.rpc;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.scuttlebutt.rpc.RPCFlag.BodyType;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * A successful RPC response.
+ */
+public class RPCResponse {
+
+  private final Bytes body;
+  private final BodyType bodyType;
+
+  /**
+   * A successful RPC response.
+   *
+   * @param body the body of the response in bytes
+   * @param bodyType the type of the response (e.g. JSON, UTF-8 or binary.)
+   */
+  public RPCResponse(Bytes body, BodyType bodyType) {
+
+    this.body = body;
+    this.bodyType = bodyType;
+  }
+
+  /**
+   * @return the RPC response body
+   */
+  public Bytes body() {
+    return body;
+  }
+
+  /**
+   * @return The type of the data contained in the body.
+   */
+  public BodyType bodyType() {
+    return bodyType;
+  }
+
+  /**
+   * Provides the body of the message as a UTF-8 string.
+   *
+   * @return the body of the message as a UTF-8 string
+   */
+  public String asString() {
+    return new String(body().toArrayUnsafe(), UTF_8);
+  }
+
+  /**
+   * Provides the body of the message, marshalled as a JSON object.
+   *
+   * @param objectMapper the object mapper to deserialize with
+   * @param clazz the JSON object class
+   * @param <T> the matching JSON object class
+   * @return a new instance of the JSON object class
+   * @throws IOException if an error occurs during marshalling
+   */
+  public <T> T asJSON(ObjectMapper objectMapper, Class<T> clazz) throws IOException {
+    return objectMapper.readerFor(clazz).readValue(body().toArrayUnsafe());
+  }
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java
index ff198da..1249182 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java
@@ -1,8 +1,8 @@
 /*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -10,9 +10,9 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package net.consensys.cava.scuttlebutt.rpc;
+package org.apache.tuweni.scuttlebutt.rpc;
 
-import net.consensys.cava.bytes.Bytes;
+import org.apache.tuweni.bytes.Bytes;
 
 import java.util.List;
 
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java
index 722dcbf..66b7d74 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java
@@ -1,8 +1,8 @@
 /*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -10,13 +10,13 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package net.consensys.cava.scuttlebutt.rpc.mux;
+package org.apache.tuweni.scuttlebutt.rpc.mux;
 
-import net.consensys.cava.concurrent.AsyncResult;
-import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
-import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
-import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
-import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+import org.apache.tuweni.concurrent.AsyncResult;
+import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest;
+import org.apache.tuweni.scuttlebutt.rpc.RPCResponse;
+import org.apache.tuweni.scuttlebutt.rpc.RPCStreamRequest;
+import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
 
 import java.util.function.Function;
 
@@ -35,7 +35,7 @@ public interface Multiplexer {
    *
    * @return an async result which will be completed with the result or an error if the request fails.
    */
-  AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request);
+  AsyncResult<RPCResponse> makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException;
 
   /**
    * Creates a request which opens a stream (e.g. a 'source' in the protocol docs.)
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
index 0ac4583..4655862 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
@@ -1,8 +1,8 @@
 /*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -10,41 +10,36 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package net.consensys.cava.scuttlebutt.rpc.mux;
-
-import net.consensys.cava.bytes.Bytes;
-import net.consensys.cava.concurrent.AsyncResult;
-import net.consensys.cava.concurrent.CompletableAsyncResult;
-import net.consensys.cava.scuttlebutt.handshake.vertx.ClientHandler;
-import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
-import net.consensys.cava.scuttlebutt.rpc.RPCCodec;
-import net.consensys.cava.scuttlebutt.rpc.RPCErrorBody;
-import net.consensys.cava.scuttlebutt.rpc.RPCFlag;
-import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
-import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
-import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+package org.apache.tuweni.scuttlebutt.rpc.mux;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.concurrent.AsyncResult;
+import org.apache.tuweni.concurrent.CompletableAsyncResult;
+import org.apache.tuweni.scuttlebutt.handshake.vertx.ClientHandler;
+import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest;
+import org.apache.tuweni.scuttlebutt.rpc.RPCCodec;
+import org.apache.tuweni.scuttlebutt.rpc.RPCFlag;
+import org.apache.tuweni.scuttlebutt.rpc.RPCMessage;
+import org.apache.tuweni.scuttlebutt.rpc.RPCResponse;
+import org.apache.tuweni.scuttlebutt.rpc.RPCStreamRequest;
+import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
 import org.logl.Logger;
 import org.logl.LoggerProvider;
 
 /**
- * Handles RPC requests and responses from an active connection to a scuttlebutt node
- *
- * Note: the public methods on this class are synchronized so that a request is rejected if the connection has been
- * closed before it begins and any 'in flight' requests are ended exceptionally with a 'connection closed' error without
- * new incoming requests being added to the maps by threads.
- *
- * In the future,we could perhaps be carefully more fine grained about the locking if we require a high degree of
- * concurrency.
- *
+ * Handles RPC requests and responses from an active connection to a scuttlebutt node.
  */
 public class RPCHandler implements Multiplexer, ClientHandler {
 
@@ -53,7 +48,13 @@ public class RPCHandler implements Multiplexer, ClientHandler {
   private final Runnable connectionCloser;
   private final ObjectMapper objectMapper;
 
-  private Map<Integer, CompletableAsyncResult<RPCMessage>> awaitingAsyncResponse = new HashMap<>();
+  /**
+   * We run each each update on the vertx event loop to update the request state synchronously, and to handle the
+   * underlying connection closing by failing the in progress requests and not accepting future requests
+   */
+  private final Vertx vertx;
+
+  private Map<Integer, CompletableAsyncResult<RPCResponse>> awaitingAsyncResponse = new HashMap<>();
   private Map<Integer, ScuttlebuttStreamHandler> streams = new HashMap<>();
 
   private boolean closed;
@@ -61,16 +62,19 @@ public class RPCHandler implements Multiplexer, ClientHandler {
   /**
    * Makes RPC requests over a connection
    *
+   * @param vertx The vertx instance to queue requests with
    * @param messageSender sends the request to the node
    * @param terminationFn closes the connection
    * @param objectMapper the objectMapper to serialize and deserialize message request and response bodies
    * @param logger
    */
   public RPCHandler(
+      Vertx vertx,
       Consumer<Bytes> messageSender,
       Runnable terminationFn,
       ObjectMapper objectMapper,
       LoggerProvider logger) {
+    this.vertx = vertx;
     this.messageSender = messageSender;
     this.connectionCloser = terminationFn;
     this.closed = false;
@@ -80,86 +84,116 @@ public class RPCHandler implements Multiplexer, ClientHandler {
   }
 
   @Override
-  public synchronized AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request) {
+  public AsyncResult<RPCResponse> makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException {
 
-    CompletableAsyncResult<RPCMessage> result = AsyncResult.incomplete();
+    Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper);
 
-    if (closed) {
-      result.completeExceptionally(new ConnectionClosedException());
-    }
+    CompletableAsyncResult<RPCResponse> result = AsyncResult.incomplete();
 
-    try {
-      RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper));
-      int requestNumber = message.requestNumber();
-      awaitingAsyncResponse.put(requestNumber, result);
-      Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags());
-      messageSender.accept(bytes);
+    Handler<Void> synchronizedAddRequest = (event) -> {
+      if (closed) {
+        result.completeExceptionally(new ConnectionClosedException());
+      } else {
+        RPCMessage message = new RPCMessage(bodyBytes);
+        int requestNumber = message.requestNumber();
 
-    } catch (JsonProcessingException e) {
-      result.completeExceptionally(e);
-    }
+        awaitingAsyncResponse.put(requestNumber, result);
+        Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags());
+        sendBytes(bytes);
+      }
+    };
 
+    vertx.runOnContext(synchronizedAddRequest);
     return result;
   }
 
   @Override
-  public synchronized void openStream(
-      RPCStreamRequest request,
-      Function<Runnable, ScuttlebuttStreamHandler> responseSink) throws JsonProcessingException,
-      ConnectionClosedException {
+  public void openStream(RPCStreamRequest request, Function<Runnable, ScuttlebuttStreamHandler> responseSink)
+      throws JsonProcessingException {
 
-    if (closed) {
-      throw new ConnectionClosedException();
-    }
+    Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper);
+
+    Handler<Void> synchronizedRequest = (event) -> {
 
-    try {
       RPCFlag[] rpcFlags = request.getRPCFlags();
-      RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper));
+      RPCMessage message = new RPCMessage(bodyBytes);
       int requestNumber = message.requestNumber();
 
-      Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags);
-      messageSender.accept(bytes);
-
-      Runnable closeStreamHandler = new Runnable() {
-        @Override
-        public void run() {
+      Bytes requestBytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags);
 
-          try {
-            Bytes bytes = RPCCodec.encodeStreamEndRequest(requestNumber);
-            messageSender.accept(bytes);
-          } catch (JsonProcessingException e) {
-            logger.warn("Unexpectedly could not encode stream end message to JSON.");
-          }
+      Runnable closeStreamHandler = () -> {
 
+        try {
+          Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
+          sendBytes(streamEnd);
+        } catch (JsonProcessingException e) {
+          logger.warn("Unexpectedly could not encode stream end message to JSON.");
         }
+
       };
 
       ScuttlebuttStreamHandler scuttlebuttStreamHandler = responseSink.apply(closeStreamHandler);
 
-      streams.put(requestNumber, scuttlebuttStreamHandler);
-    } catch (JsonProcessingException ex) {
-      throw ex;
-    }
+      if (closed) {
+        scuttlebuttStreamHandler.onStreamError(new ConnectionClosedException());
+      } else {
+        streams.put(requestNumber, scuttlebuttStreamHandler);
+        sendBytes(requestBytes);
+      }
+
+
+    };
+
+    vertx.runOnContext(synchronizedRequest);
   }
 
   @Override
-  public synchronized void close() {
-    connectionCloser.run();
+  public void close() {
+    vertx.runOnContext((event) -> {
+      connectionCloser.run();
+    });
   }
 
   @Override
-  public synchronized void receivedMessage(Bytes message) {
+  public void receivedMessage(Bytes message) {
 
-    RPCMessage rpcMessage = new RPCMessage(message);
+    Handler<Void> synchronizedHandleMessage = (event) -> {
+      RPCMessage rpcMessage = new RPCMessage(message);
 
-    // A negative request number indicates that this is a response, rather than a request that this node
-    // should service
-    if (rpcMessage.requestNumber() < 0) {
-      handleResponse(rpcMessage);
-    } else {
-      handleRequest(rpcMessage);
-    }
+      // A negative request number indicates that this is a response, rather than a request that this node
+      // should service
+      if (rpcMessage.requestNumber() < 0) {
+        handleResponse(rpcMessage);
+      } else {
+        handleRequest(rpcMessage);
+      }
+    };
 
+    vertx.runOnContext(synchronizedHandleMessage);
+  }
+
+  @Override
+  public void streamClosed() {
+
+    Handler<Void> synchronizedCloseStream = (event) -> {
+      closed = true;
+
+      streams.forEach((key, streamHandler) -> {
+        streamHandler.onStreamError(new ConnectionClosedException());
+      });
+
+      streams.clear();
+
+      awaitingAsyncResponse.forEach((key, value) -> {
+        if (!value.isDone()) {
+          value.completeExceptionally(new ConnectionClosedException());
+        }
+      });
+
+      awaitingAsyncResponse.clear();
+    };
+
+    vertx.runOnContext(synchronizedCloseStream);
   }
 
   private void handleRequest(RPCMessage rpcMessage) {
@@ -179,6 +213,8 @@ public class RPCHandler implements Multiplexer, ClientHandler {
 
     boolean isStream = RPCFlag.Stream.STREAM.isApplied(rpcFlags);
 
+    Optional<RPCRequestFailedException> exception = response.getException(objectMapper);
+
     if (isStream) {
       ScuttlebuttStreamHandler scuttlebuttStreamHandler = streams.get(requestNumber);
 
@@ -187,20 +223,11 @@ public class RPCHandler implements Multiplexer, ClientHandler {
         if (response.isSuccessfulLastMessage()) {
           streams.remove(requestNumber);
           scuttlebuttStreamHandler.onStreamEnd();
-        } else if (response.isErrorMessage()) {
-
-          Optional<RPCErrorBody> errorBody = response.getErrorBody(objectMapper);
-
-          if (errorBody.isPresent()) {
-            scuttlebuttStreamHandler.onStreamError(new Exception(errorBody.get().getMessage()));
-          } else {
-            // This shouldn't happen, but for safety we fall back to just writing the whole body in the exception message
-            // if we fail to marshall it for whatever reason
-            scuttlebuttStreamHandler.onStreamError(new Exception(response.asString()));
-          }
-
+        } else if (exception.isPresent()) {
+          scuttlebuttStreamHandler.onStreamError(exception.get());
         } else {
-          scuttlebuttStreamHandler.onMessage(response);
+          RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType());
+          scuttlebuttStreamHandler.onMessage(successfulResponse);
         }
       } else {
         logger.warn(
@@ -212,11 +239,18 @@ public class RPCHandler implements Multiplexer, ClientHandler {
 
     } else {
 
-      CompletableAsyncResult<RPCMessage> rpcMessageFuture = awaitingAsyncResponse.get(requestNumber);
+      CompletableAsyncResult<RPCResponse> rpcMessageFuture = awaitingAsyncResponse.remove(requestNumber);
 
       if (rpcMessageFuture != null) {
-        rpcMessageFuture.complete(response);
-        awaitingAsyncResponse.remove(requestNumber);
+
+        if (exception.isPresent()) {
+          rpcMessageFuture.completeExceptionally(exception.get());
+        } else {
+          RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType());
+
+          rpcMessageFuture.complete(successfulResponse);
+        }
+
       } else {
         logger.warn(
             "Couldn't find async handler for RPC response with request number "
@@ -228,23 +262,8 @@ public class RPCHandler implements Multiplexer, ClientHandler {
 
   }
 
-  @Override
-  public void streamClosed() {
-    this.closed = true;
-
-    streams.forEach((key, streamHandler) -> {
-      streamHandler.onStreamError(new ConnectionClosedException());
-    });
-
-    streams.clear();
-
-    awaitingAsyncResponse.forEach((key, value) -> {
-      if (!value.isDone()) {
-        value.completeExceptionally(new ConnectionClosedException());
-      }
-
-    });
-
-
+  private void sendBytes(Bytes bytes) {
+    messageSender.accept(bytes);
   }
+
 }
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java
index d108663..d64b54a 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java
@@ -1,8 +1,8 @@
 /*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -10,9 +10,9 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package net.consensys.cava.scuttlebutt.rpc.mux;
+package org.apache.tuweni.scuttlebutt.rpc.mux;
 
-import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
+import org.apache.tuweni.scuttlebutt.rpc.RPCResponse;
 
 /**
  * Handles incoming items from a result stream
@@ -24,7 +24,7 @@ public interface ScuttlebuttStreamHandler {
    *
    * @param message
    */
-  void onMessage(RPCMessage message);
+  void onMessage(RPCResponse message);
 
   /**
    * Invoked when the stream has been closed.
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java
index 160946c..a3c269c 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java
@@ -1,8 +1,8 @@
 /*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -10,7 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package net.consensys.cava.scuttlebutt.rpc.mux.exceptions;
+package org.apache.tuweni.scuttlebutt.rpc.mux.exceptions;
 
 public class ConnectionClosedException extends Exception {
 
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java
new file mode 100644
index 0000000..5baefca
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.scuttlebutt.rpc.mux.exceptions;
+
+public final class RPCRequestFailedException extends RuntimeException {
+
+  public RPCRequestFailedException(String errorMessage) {
+    super(errorMessage);
+  }
+}
diff --git a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java
index 3178fae..9d8527d 100644
--- a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java
+++ b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java
@@ -16,13 +16,11 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.bytes.Bytes32;
 import org.apache.tuweni.concurrent.AsyncResult;
 import org.apache.tuweni.crypto.sodium.Signature;
-import org.apache.tuweni.crypto.sodium.Sodium;
 import org.apache.tuweni.io.Base64;
 import org.apache.tuweni.junit.VertxExtension;
 import org.apache.tuweni.junit.VertxInstance;
@@ -43,7 +41,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import io.vertx.core.Vertx;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -60,11 +57,6 @@ import org.logl.vertx.LoglLogDelegateFactory;
 @ExtendWith(VertxExtension.class)
 class PatchworkIntegrationTest {
 
-  @BeforeAll
-  static void checkAvailable() {
-    assumeTrue(Sodium.isAvailable(), "Sodium native library is not available");
-  }
-
   public static class MyClientHandler implements ClientHandler {
 
     private final Consumer<Bytes> sender;
diff --git a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java
index 293ac92..6581b40 100644
--- a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java
+++ b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java
@@ -1,8 +1,8 @@
 /*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -10,7 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package net.consensys.cava.scuttlebutt.rpc.mux;
+package org.apache.tuweni.scuttlebutt.rpc.mux;
 
 /*
  * Copyright 2019 ConsenSys AG.
@@ -26,22 +26,20 @@ package net.consensys.cava.scuttlebutt.rpc.mux;
  */
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-
-import net.consensys.cava.bytes.Bytes;
-import net.consensys.cava.bytes.Bytes32;
-import net.consensys.cava.concurrent.AsyncResult;
-import net.consensys.cava.concurrent.CompletableAsyncResult;
-import net.consensys.cava.crypto.sodium.Signature;
-import net.consensys.cava.io.Base64;
-import net.consensys.cava.junit.VertxExtension;
-import net.consensys.cava.junit.VertxInstance;
-import net.consensys.cava.scuttlebutt.handshake.vertx.SecureScuttlebuttVertxClient;
-import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
-import net.consensys.cava.scuttlebutt.rpc.RPCFunction;
-import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
-import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
-import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.bytes.Bytes32;
+import org.apache.tuweni.concurrent.AsyncResult;
+import org.apache.tuweni.concurrent.CompletableAsyncResult;
+import org.apache.tuweni.crypto.sodium.Signature;
+import org.apache.tuweni.io.Base64;
+import org.apache.tuweni.junit.VertxExtension;
+import org.apache.tuweni.junit.VertxInstance;
+import org.apache.tuweni.scuttlebutt.handshake.vertx.SecureScuttlebuttVertxClient;
+import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest;
+import org.apache.tuweni.scuttlebutt.rpc.RPCFunction;
+import org.apache.tuweni.scuttlebutt.rpc.RPCResponse;
+import org.apache.tuweni.scuttlebutt.rpc.RPCStreamRequest;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -78,27 +76,21 @@ public class PatchworkIntegrationTest {
 
     RPCHandler rpcHandler = makeRPCHandler(vertx);
 
-    List<AsyncResult<RPCMessage>> results = new ArrayList<>();
+    List<AsyncResult<RPCResponse>> results = new ArrayList<>();
 
     for (int i = 0; i < 10; i++) {
       RPCFunction function = new RPCFunction("whoami");
       RPCAsyncRequest asyncRequest = new RPCAsyncRequest(function, new ArrayList<>());
 
-      AsyncResult<RPCMessage> res = rpcHandler.makeAsyncRequest(asyncRequest);
+      AsyncResult<RPCResponse> res = rpcHandler.makeAsyncRequest(asyncRequest);
 
       results.add(res);
     }
 
-    AsyncResult<List<RPCMessage>> allResults = AsyncResult.combine(results);
-    List<RPCMessage> rpcMessages = allResults.get();
+    AsyncResult<List<RPCResponse>> allResults = AsyncResult.combine(results);
+    List<RPCResponse> rpcMessages = allResults.get();
 
     assertEquals(10, rpcMessages.size());
-
-    rpcMessages.forEach(msg -> {
-      assertFalse(msg.lastMessageOrError());
-
-    });
-
   }
 
 
@@ -158,7 +150,7 @@ public class PatchworkIntegrationTest {
     RPCHandler rpcHandler = makeRPCHandler(vertx);
 
 
-    List<AsyncResult<RPCMessage>> results = new ArrayList<>();
+    List<AsyncResult<RPCResponse>> results = new ArrayList<>();
 
     for (int i = 0; i < 20; i++) {
       // Note: in a real use case, this would more likely be a Java class with these fields
@@ -168,13 +160,13 @@ public class PatchworkIntegrationTest {
 
       RPCAsyncRequest asyncRequest = new RPCAsyncRequest(new RPCFunction("publish"), Arrays.asList(params));
 
-      AsyncResult<RPCMessage> rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest);
+      AsyncResult<RPCResponse> rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest);
 
       results.add(rpcMessageAsyncResult);
 
     }
 
-    List<RPCMessage> rpcMessages = AsyncResult.combine(results).get();
+    List<RPCResponse> rpcMessages = AsyncResult.combine(results).get();
 
     rpcMessages.forEach(msg -> System.out.println(msg.asString()));
   }
@@ -196,7 +188,7 @@ public class PatchworkIntegrationTest {
     AsyncResult<RPCHandler> onConnect =
         secureScuttlebuttVertxClient.connectTo(port, host, keyPair.publicKey(), (sender, terminationFn) -> {
 
-          return new RPCHandler(sender, terminationFn, new ObjectMapper(), loggerProvider);
+          return new RPCHandler(vertx, sender, terminationFn, new ObjectMapper(), loggerProvider);
         });
 
     return onConnect.get();
@@ -219,26 +211,23 @@ public class PatchworkIntegrationTest {
 
     RPCStreamRequest streamRequest = new RPCStreamRequest(new RPCFunction("createUserStream"), Arrays.asList(params));
 
-    try {
-      handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() {
-        @Override
-        public void onMessage(RPCMessage message) {
-          System.out.print(message.asString());
-        }
-
-        @Override
-        public void onStreamEnd() {
-          streamEnded.complete(null);
-        }
-
-        @Override
-        public void onStreamError(Exception ex) {
-
-        }
-      });
-    } catch (ConnectionClosedException e) {
-      throw e;
-    }
+    handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() {
+      @Override
+      public void onMessage(RPCResponse message) {
+        System.out.print(message.asString());
+      }
+
+      @Override
+      public void onStreamEnd() {
+        streamEnded.complete(null);
+      }
+
+      @Override
+      public void onStreamError(Exception ex) {
+
+        streamEnded.completeExceptionally(ex);
+      }
+    });
 
     // Wait until the stream is complete
     streamEnded.get();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org