You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/04/23 19:36:35 UTC
[incubator-tuweni] 31/40: wip
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit fb27edaeeee3eaf999678b3c5202e2bebee4871b
Author: Antoine Toulme <to...@apache.org>
AuthorDate: Mon Apr 22 10:57:47 2019 -0700
wip
---
.../handshake/vertx/ClientHandlerFactory.java | 4 +-
.../vertx/SecureScuttlebuttVertxClient.java | 20 +-
.../vertx/SecureScuttlebuttVertxServer.java | 2 +-
.../handshake/vertx/VertxIntegrationTest.java | 3 +-
scuttlebutt-rpc/build.gradle | 2 +
.../tuweni/scuttlebutt/rpc/RPCAsyncRequest.java | 58 +++++
.../apache/tuweni/scuttlebutt/rpc/RPCCodec.java | 30 +++
.../tuweni/scuttlebutt/rpc/RPCErrorBody.java | 64 ++++++
.../apache/tuweni/scuttlebutt/rpc/RPCFunction.java | 52 +++++
.../apache/tuweni/scuttlebutt/rpc/RPCMessage.java | 44 +++-
.../tuweni/scuttlebutt/rpc/RPCRequestBody.java | 71 ++++++
.../tuweni/scuttlebutt/rpc/RPCRequestType.java | 35 +++
.../tuweni/scuttlebutt/rpc/RPCStreamRequest.java | 57 +++++
.../tuweni/scuttlebutt/rpc/mux/Multiplexer.java | 59 +++++
.../tuweni/scuttlebutt/rpc/mux/RPCHandler.java | 250 +++++++++++++++++++++
.../rpc/mux/ScuttlebuttStreamHandler.java | 41 ++++
.../mux/exceptions/ConnectionClosedException.java | 21 ++
.../scuttlebutt/rpc/PatchworkIntegrationTest.java | 1 +
.../tuweni/scuttlebutt/rpc/RPCEncodingTest.java | 3 +-
.../rpc/mux/PatchworkIntegrationTest.java | 249 ++++++++++++++++++++
20 files changed, 1045 insertions(+), 21 deletions(-)
diff --git a/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/ClientHandlerFactory.java b/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/ClientHandlerFactory.java
index e947f49..f047acd 100644
--- a/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/ClientHandlerFactory.java
+++ b/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/ClientHandlerFactory.java
@@ -19,7 +19,7 @@ import java.util.function.Consumer;
/**
* Factory creating stream handlers, managing client-side connections.
*/
-public interface ClientHandlerFactory {
+public interface ClientHandlerFactory<T extends ClientHandler> {
/**
* Creates a new handler associated with a valid streaming connection.
@@ -27,5 +27,5 @@ public interface ClientHandlerFactory {
* @param sender the function to send bytes to the server
* @param terminationFunction a function to terminate the stream properly
*/
- ClientHandler createHandler(Consumer<Bytes> sender, Runnable terminationFunction);
+ T createHandler(Consumer<Bytes> sender, Runnable terminationFunction);
}
diff --git a/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java b/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java
index c7f69e5..7ac177c 100644
--- a/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java
+++ b/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.java
@@ -39,16 +39,16 @@ import org.logl.LoggerProvider;
*/
public final class SecureScuttlebuttVertxClient {
- private class NetSocketClientHandler {
+ private class NetSocketClientHandler<T extends ClientHandler> {
private final Logger logger;
private final NetSocket socket;
private final SecureScuttlebuttHandshakeClient handshakeClient;
- private final ClientHandlerFactory handlerFactory;
- private final CompletableAsyncResult<ClientHandler> completionHandle;
+ private final ClientHandlerFactory<T> handlerFactory;
+ private final CompletableAsyncResult<T> completionHandle;
private int handshakeCounter;
private SecureScuttlebuttStreamClient client;
- private ClientHandler handler;
+ private T handler;
private Bytes messageBuffer = Bytes.EMPTY;
@@ -56,8 +56,8 @@ public final class SecureScuttlebuttVertxClient {
Logger logger,
NetSocket socket,
Signature.PublicKey remotePublicKey,
- ClientHandlerFactory handlerFactory,
- CompletableAsyncResult<ClientHandler> completionHandle) {
+ ClientHandlerFactory<T> handlerFactory,
+ CompletableAsyncResult<T> completionHandle) {
this.logger = logger;
this.socket = socket;
this.handshakeClient = SecureScuttlebuttHandshakeClient.create(keyPair, networkIdentifier, remotePublicKey);
@@ -182,19 +182,19 @@ public final class SecureScuttlebuttVertxClient {
* @param handlerFactory the factory of handlers for connections
* @return a handle to a new stream handler with the remote host
*/
- public AsyncResult<ClientHandler> connectTo(
+ public <T extends ClientHandler> AsyncResult<T> connectTo(
int port,
String host,
Signature.PublicKey remotePublicKey,
- ClientHandlerFactory handlerFactory) {
+ ClientHandlerFactory<T> handlerFactory) {
client = vertx.createNetClient(new NetClientOptions().setTcpKeepAlive(true));
- CompletableAsyncResult<ClientHandler> completion = AsyncResult.incomplete();
+ CompletableAsyncResult<T> completion = AsyncResult.incomplete();
client.connect(port, host, res -> {
if (res.failed()) {
completion.completeExceptionally(res.cause());
} else {
NetSocket socket = res.result();
- new NetSocketClientHandler(
+ new NetSocketClientHandler<T>(
loggerProvider.getLogger(host + ":" + port),
socket,
remotePublicKey,
diff --git a/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java b/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java
index abd8b82..1e82956 100644
--- a/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java
+++ b/scuttlebutt-handshake/src/main/java/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.java
@@ -113,8 +113,8 @@ public final class SecureScuttlebuttVertxServer {
}
}
} catch (HandshakeException | StreamException e) {
- e.printStackTrace();
netSocket.close();
+ throw e;
}
}
}
diff --git a/scuttlebutt-handshake/src/test/java/org/apache/tuweni/scuttlebutt/handshake/vertx/VertxIntegrationTest.java b/scuttlebutt-handshake/src/test/java/org/apache/tuweni/scuttlebutt/handshake/vertx/VertxIntegrationTest.java
index 526dba3..6b32dcb 100644
--- a/scuttlebutt-handshake/src/test/java/org/apache/tuweni/scuttlebutt/handshake/vertx/VertxIntegrationTest.java
+++ b/scuttlebutt-handshake/src/test/java/org/apache/tuweni/scuttlebutt/handshake/vertx/VertxIntegrationTest.java
@@ -120,8 +120,7 @@ class VertxIntegrationTest {
SecureScuttlebuttVertxClient client =
new SecureScuttlebuttVertxClient(provider, vertx, Signature.KeyPair.random(), networkIdentifier);
- MyClientHandler handler =
- (MyClientHandler) client.connectTo(20000, "0.0.0.0", serverKeyPair.publicKey(), MyClientHandler::new).get();
+ MyClientHandler handler = client.connectTo(20000, "0.0.0.0", serverKeyPair.publicKey(), MyClientHandler::new).get();
Thread.sleep(1000);
assertNotNull(handler);
diff --git a/scuttlebutt-rpc/build.gradle b/scuttlebutt-rpc/build.gradle
index b6f1339..cf71a72 100644
--- a/scuttlebutt-rpc/build.gradle
+++ b/scuttlebutt-rpc/build.gradle
@@ -5,6 +5,8 @@ dependencies {
compile project(':concurrent')
compile project(':crypto')
compile project(':scuttlebutt')
+ compile project(':scuttlebutt-handshake')
+ compile 'org.logl:logl-api'
compile 'com.fasterxml.jackson.core:jackson-databind'
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java
new file mode 100644
index 0000000..f4281d7
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc;
+
+import net.consensys.cava.bytes.Bytes;
+
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class RPCAsyncRequest {
+
+
+ private final RPCFunction function;
+ private final List<Object> arguments;
+
+ /**
+ *
+ * @param function the function to be in invoked. If the function is in a namespace, the first n-1 items in the array
+ * are the namespace followed by the function name (e.g. 'blobs.get' becomes ['blobs', 'get']).
+ * @param arguments The arguments passed to the function being invoked. Each item can be any arbitrary object which is
+ * JSON serializable (e.g. String, Int, list, object.)
+ *
+ */
+ public RPCAsyncRequest(RPCFunction function, List<Object> arguments) {
+ this.function = function;
+ this.arguments = arguments;
+ }
+
+ /**
+ * Encode the RPC request as bytes.
+ *
+ * @param objectMapper the object mapper to serialize the request with
+ * @return an RPC request serialized into bytes
+ * @throws JsonProcessingException thrown if there is an error while serializing the request to bytes
+ */
+ public Bytes toEncodedRpcMessage(ObjectMapper objectMapper) throws JsonProcessingException {
+ return RPCCodec.encodeRequest(
+ new RPCRequestBody(function.asList(), RPCRequestType.ASYNC, arguments).asBytes(objectMapper),
+ getRPCFlags());
+ }
+
+ public RPCFlag[] getRPCFlags() {
+ return new RPCFlag[] {RPCFlag.BodyType.JSON};
+ }
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
index ca7223d..750dfd0 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCCodec.java
@@ -18,6 +18,9 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
/**
* Encoder responsible for encoding requests.
* <p>
@@ -27,6 +30,8 @@ public final class RPCCodec {
static final AtomicInteger counter = new AtomicInteger(1);
+ private static ObjectMapper mapper = new ObjectMapper();
+
private static int nextRequestNumber() {
int requestNumber = counter.getAndIncrement();
if (requestNumber < 1) {
@@ -82,6 +87,19 @@ public final class RPCCodec {
}
/**
+ * Encode a message as an RPC request.
+ *
+ * @param body the body to encode as an RPC request
+ * @param requestNumber the request number
+ * @param flags the flags of the RPC request (already encoded.)
+ * @return the message encoded as an RPC request
+ */
+ public static Bytes encodeRequest(Bytes body, int requestNumber, byte flags) {
+ return Bytes
+ .concatenate(Bytes.of(flags), Bytes.ofUnsignedInt(body.size()), Bytes.ofUnsignedInt(requestNumber), body);
+ }
+
+ /**
* Encode a message as a response to a RPC request.
*
* @param body the body to encode as the body of the response
@@ -117,6 +135,18 @@ public final class RPCCodec {
}
/**
+ * Encodes a message with the body and headers set in the appropriate way to end a stream.
+ *
+ * @return the response encoded as an RPC request
+ * @throws JsonProcessingException
+ */
+ public static Bytes encodeStreamEndRequest(int requestNumber) throws JsonProcessingException {
+ Boolean bool = Boolean.TRUE;
+ byte[] bytes = mapper.writeValueAsBytes(bool);
+ return encodeRequest(Bytes.wrap(bytes), requestNumber, RPCFlag.EndOrError.END);
+ }
+
+ /**
* Encode a message as a response to a RPC request.
*
* @param body the body to encode as the body of the response
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java
new file mode 100644
index 0000000..87fa5b6
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc;
+
+/**
+ * An RPC message response body which contains an error
+ */
+public class RPCErrorBody {
+
+ private String name;
+ private String message;
+ private String stack;
+
+ public RPCErrorBody() {
+
+ }
+
+ /**
+ * A description of an error that occurred while performing an RPC request.
+ *
+ * @param name the name of the error type
+ * @param message the message describing the error
+ * @param stack the stack trace from the error
+ */
+ public RPCErrorBody(String name, String message, String stack) {
+ this.name = name;
+ this.message = message;
+ this.stack = stack;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public String getStack() {
+ return stack;
+ }
+
+ public void setStack(String stack) {
+ this.stack = stack;
+ }
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java
new file mode 100644
index 0000000..511a491
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A scuttlebutt RPC function namespace and name representation.
+ */
+public class RPCFunction {
+
+ private final List<String> namespace;
+ private final String functionName;
+
+ /**
+ *
+ * @param namespace the namespace of the function (e.g. ['blobs']. May be empty if there is no namespace for the
+ * function.
+ * @param functionName the function (e.g. 'add'.)
+ */
+ public RPCFunction(List<String> namespace, String functionName) {
+ this.namespace = namespace;
+ this.functionName = functionName;
+ }
+
+ public RPCFunction(String functionName) {
+ this.namespace = new ArrayList<>();
+ this.functionName = functionName;
+ }
+
+ /**
+ * @return The list representation of the namespace and function call.
+ */
+ public List<String> asList() {
+ List<String> list = new ArrayList<>();
+ list.addAll(namespace);
+ list.add(functionName);
+ return list;
+ }
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java
index a82f1fc..2f21645 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java
@@ -19,14 +19,13 @@ import org.apache.tuweni.bytes.Bytes;
import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
/**
* Decoded RPC message, making elements of the message available directly.
*/
public final class RPCMessage {
- private static final ObjectMapper mapper = new ObjectMapper();
-
private final byte rpcFlags;
private final boolean stream;
private final boolean lastMessageOrError;
@@ -82,6 +81,40 @@ public final class RPCMessage {
}
/**
+ *
+ * @return true if this is a last message in a stream, and it is not an error
+ */
+ public boolean isSuccessfulLastMessage() {
+ return lastMessageOrError() && asString().equals("true");
+ }
+
+ /**
+ *
+ * @return true if this is an error message response
+ */
+ public boolean isErrorMessage() {
+ return lastMessageOrError && !isSuccessfulLastMessage();
+ }
+
+ /**
+ * @param objectMapper the object mapper to deserialize with
+ * @return the RPC error response body, if this is an error response - nothing otherwise
+ */
+ public Optional<RPCErrorBody> getErrorBody(ObjectMapper objectMapper) {
+
+ if (!isErrorMessage()) {
+ // If the body of the response is 'true' or the error flag isn't set, it's a successful end condition
+ return Optional.absent();
+ } else {
+ try {
+ return Optional.of(asJSON(objectMapper, RPCErrorBody.class));
+ } catch (IOException e) {
+ return Optional.absent();
+ }
+ }
+ }
+
+ /**
* Provides the type of the body of the message: a binary message, a UTF-8 string or a JSON message.
*
* @return the type of the body: a binary message, a UTF-8 string or a JSON message
@@ -128,13 +161,14 @@ public final class RPCMessage {
/**
* Provides the body of the message, marshalled as a JSON object.
- *
+ *
+ * @param objectMapper the object mapper to deserialize with
* @param clazz the JSON object class
* @param <T> the matching JSON object class
* @return a new instance of the JSON object class
* @throws IOException if an error occurs during marshalling
*/
- public <T> T asJSON(Class<T> clazz) throws IOException {
- return mapper.readerFor(clazz).readValue(body().toArrayUnsafe());
+ public <T> T asJSON(ObjectMapper objectMapper, Class<T> clazz) throws IOException {
+ return objectMapper.readerFor(clazz).readValue(body().toArrayUnsafe());
}
}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java
new file mode 100644
index 0000000..70ad4e3
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc;
+
+import net.consensys.cava.bytes.Bytes;
+
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * The request payload of an RPC request to another node. The fields are as specified in the scuttlebutt protocol docs
+ */
+public class RPCRequestBody {
+
+ private final List<String> name;
+ private final RPCRequestType type;
+ private final List<Object> args;
+
+ /**
+ *
+ * @param name the function to be in invoked. If the function is in a namespace, the first n-1 items in the array are
+ * the namespace followed by the function name (e.g. 'blobs.get' becomes ['blobs', 'get']). If the function is
+ * not in a namespace, it is an array with one item (e.g. ['createFeedStream'].
+ * @param type the type of the request (e.g. stream or async.)
+ * @param args The args passed to the function being invoked. Each item can be any arbitrary object which is JSON
+ * serializable (e.g. String, Int, list, object.)
+ */
+ public RPCRequestBody(List<String> name, RPCRequestType type, List<Object> args) {
+ this.name = name;
+ this.type = type;
+ this.args = args;
+ }
+
+ public List<String> getName() {
+ return name;
+ }
+
+ public RPCRequestType getType() {
+ return type;
+ }
+
+ public List<Object> getArgs() {
+ return args;
+ }
+
+ /**
+ *
+ * @param objectMapper the object mapper to serialize to bytes with
+ * @return the bytes representation of this RPC request body. The request is first encoded into JSON, then from JSON
+ * to a byte array
+ * @throws JsonProcessingException thrown if there is a problem transforming the object to JSON.
+ */
+ public Bytes asBytes(ObjectMapper objectMapper) throws JsonProcessingException {
+ byte[] bytes = objectMapper.writeValueAsBytes(this);
+ return Bytes.wrap(bytes);
+ }
+
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java
new file mode 100644
index 0000000..9703da4
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The available type of Scuttlebutt RPC requests
+ */
+public enum RPCRequestType {
+
+ /**
+ * An 'async' request, which returns one result some time in the future.
+ */
+ @JsonProperty("async")
+ ASYNC,
+
+ /**
+ * A 'source' type request, which begins a stream of results
+ */
+ @JsonProperty("source")
+ SOURCE
+
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java
new file mode 100644
index 0000000..ff198da
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc;
+
+import net.consensys.cava.bytes.Bytes;
+
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * A request which returns a 'source' type result (e.g. opens up a stream that is followed by the request ID.)
+ */
+public class RPCStreamRequest {
+
+ private final RPCFunction function;
+ private final List<Object> arguments;
+
+ /**
+ * The details for the function (the name of the function and its arguments.)
+ *
+ * @param function the function to be invoked
+ * @param arguments the arguments for the function (can be any arbitrary class which can be marshalled into JSON.)
+ */
+ public RPCStreamRequest(RPCFunction function, List<Object> arguments) {
+ this.function = function;
+ this.arguments = arguments;
+ }
+
+ /**
+ * @return The byte representation for the request after it is marshalled into a JSON string.
+ * @throws JsonProcessingException if an error was thrown while marshalling to JSON
+ */
+ public Bytes toEncodedRpcMessage(ObjectMapper mapper) throws JsonProcessingException {
+ RPCRequestBody body = new RPCRequestBody(function.asList(), RPCRequestType.SOURCE, arguments);
+ return RPCCodec.encodeRequest(body.asBytes(mapper), getRPCFlags());
+ }
+
+ /**
+ * @return The correct RPC flags for a stream request
+ */
+ public RPCFlag[] getRPCFlags() {
+ return new RPCFlag[] {RPCFlag.Stream.STREAM, RPCFlag.BodyType.JSON};
+ }
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java
new file mode 100644
index 0000000..722dcbf
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc.mux;
+
+import net.consensys.cava.concurrent.AsyncResult;
+import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
+import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
+import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
+import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+
+import java.util.function.Function;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+/**
+ * Multiplexes asynchronous requests and streams across a connection to a node. Handles multiple active requests and
+ * streams across one connection.
+ */
+public interface Multiplexer {
+
+ /**
+ * Issue an 'async' type request to a node, which will eventually return a result from the node.
+ *
+ * @param request the request details
+ *
+ * @return an async result which will be completed with the result or an error if the request fails.
+ */
+ AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request);
+
+ /**
+ * Creates a request which opens a stream (e.g. a 'source' in the protocol docs.)
+ *
+ * @param request the request details
+ * @param streamFactory a function which takes a 'Runnable' which closes the stream when ran, and returns a stream
+ * handler to pass messages to
+ *
+ * @throws JsonProcessingException
+ */
+ void openStream(RPCStreamRequest request, Function<Runnable, ScuttlebuttStreamHandler> streamFactory)
+ throws JsonProcessingException,
+ ConnectionClosedException;
+
+
+ /**
+ * Close the underlying connection
+ */
+ void close();
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
new file mode 100644
index 0000000..0ac4583
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc.mux;
+
+import net.consensys.cava.bytes.Bytes;
+import net.consensys.cava.concurrent.AsyncResult;
+import net.consensys.cava.concurrent.CompletableAsyncResult;
+import net.consensys.cava.scuttlebutt.handshake.vertx.ClientHandler;
+import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
+import net.consensys.cava.scuttlebutt.rpc.RPCCodec;
+import net.consensys.cava.scuttlebutt.rpc.RPCErrorBody;
+import net.consensys.cava.scuttlebutt.rpc.RPCFlag;
+import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
+import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
+import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import org.logl.Logger;
+import org.logl.LoggerProvider;
+
+/**
+ * Handles RPC requests and responses from an active connection to a scuttlebutt node
+ *
+ * Note: the public methods on this class are synchronized so that a request is rejected if the connection has been
+ * closed before it begins and any 'in flight' requests are ended exceptionally with a 'connection closed' error without
+ * new incoming requests being added to the maps by threads.
+ *
+ * In the future,we could perhaps be carefully more fine grained about the locking if we require a high degree of
+ * concurrency.
+ *
+ */
+public class RPCHandler implements Multiplexer, ClientHandler {
+
+ private final Consumer<Bytes> messageSender;
+ private final Logger logger;
+ private final Runnable connectionCloser;
+ private final ObjectMapper objectMapper;
+
+ private Map<Integer, CompletableAsyncResult<RPCMessage>> awaitingAsyncResponse = new HashMap<>();
+ private Map<Integer, ScuttlebuttStreamHandler> streams = new HashMap<>();
+
+ private boolean closed;
+
+ /**
+ * Makes RPC requests over a connection
+ *
+ * @param messageSender sends the request to the node
+ * @param terminationFn closes the connection
+ * @param objectMapper the objectMapper to serialize and deserialize message request and response bodies
+ * @param logger
+ */
+ public RPCHandler(
+ Consumer<Bytes> messageSender,
+ Runnable terminationFn,
+ ObjectMapper objectMapper,
+ LoggerProvider logger) {
+ this.messageSender = messageSender;
+ this.connectionCloser = terminationFn;
+ this.closed = false;
+ this.objectMapper = objectMapper;
+
+ this.logger = logger.getLogger("rpc handler");
+ }
+
+ @Override
+ public synchronized AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request) {
+
+ CompletableAsyncResult<RPCMessage> result = AsyncResult.incomplete();
+
+ if (closed) {
+ result.completeExceptionally(new ConnectionClosedException());
+ }
+
+ try {
+ RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper));
+ int requestNumber = message.requestNumber();
+ awaitingAsyncResponse.put(requestNumber, result);
+ Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags());
+ messageSender.accept(bytes);
+
+ } catch (JsonProcessingException e) {
+ result.completeExceptionally(e);
+ }
+
+ return result;
+ }
+
+ @Override
+ public synchronized void openStream(
+ RPCStreamRequest request,
+ Function<Runnable, ScuttlebuttStreamHandler> responseSink) throws JsonProcessingException,
+ ConnectionClosedException {
+
+ if (closed) {
+ throw new ConnectionClosedException();
+ }
+
+ try {
+ RPCFlag[] rpcFlags = request.getRPCFlags();
+ RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper));
+ int requestNumber = message.requestNumber();
+
+ Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags);
+ messageSender.accept(bytes);
+
+ Runnable closeStreamHandler = new Runnable() {
+ @Override
+ public void run() {
+
+ try {
+ Bytes bytes = RPCCodec.encodeStreamEndRequest(requestNumber);
+ messageSender.accept(bytes);
+ } catch (JsonProcessingException e) {
+ logger.warn("Unexpectedly could not encode stream end message to JSON.");
+ }
+
+ }
+ };
+
+ ScuttlebuttStreamHandler scuttlebuttStreamHandler = responseSink.apply(closeStreamHandler);
+
+ streams.put(requestNumber, scuttlebuttStreamHandler);
+ } catch (JsonProcessingException ex) {
+ throw ex;
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ connectionCloser.run();
+ }
+
+ @Override
+ public synchronized void receivedMessage(Bytes message) {
+
+ RPCMessage rpcMessage = new RPCMessage(message);
+
+ // A negative request number indicates that this is a response, rather than a request that this node
+ // should service
+ if (rpcMessage.requestNumber() < 0) {
+ handleResponse(rpcMessage);
+ } else {
+ handleRequest(rpcMessage);
+ }
+
+ }
+
+ private void handleRequest(RPCMessage rpcMessage) {
+ // Not yet implemented
+ logger.warn("Received incoming request, but we do not yet handle any requests: " + rpcMessage.asString());
+
+ }
+
+ private void handleResponse(RPCMessage response) {
+ int requestNumber = response.requestNumber() * -1;
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Incoming response: " + response.asString());
+ }
+
+ byte rpcFlags = response.rpcFlags();
+
+ boolean isStream = RPCFlag.Stream.STREAM.isApplied(rpcFlags);
+
+ if (isStream) {
+ ScuttlebuttStreamHandler scuttlebuttStreamHandler = streams.get(requestNumber);
+
+ if (scuttlebuttStreamHandler != null) {
+
+ if (response.isSuccessfulLastMessage()) {
+ streams.remove(requestNumber);
+ scuttlebuttStreamHandler.onStreamEnd();
+ } else if (response.isErrorMessage()) {
+
+ Optional<RPCErrorBody> errorBody = response.getErrorBody(objectMapper);
+
+ if (errorBody.isPresent()) {
+ scuttlebuttStreamHandler.onStreamError(new Exception(errorBody.get().getMessage()));
+ } else {
+ // This shouldn't happen, but for safety we fall back to just writing the whole body in the exception message
+ // if we fail to marshall it for whatever reason
+ scuttlebuttStreamHandler.onStreamError(new Exception(response.asString()));
+ }
+
+ } else {
+ scuttlebuttStreamHandler.onMessage(response);
+ }
+ } else {
+ logger.warn(
+ "Couldn't find stream handler for RPC response with request number "
+ + requestNumber
+ + " "
+ + response.asString());
+ }
+
+ } else {
+
+ CompletableAsyncResult<RPCMessage> rpcMessageFuture = awaitingAsyncResponse.get(requestNumber);
+
+ if (rpcMessageFuture != null) {
+ rpcMessageFuture.complete(response);
+ awaitingAsyncResponse.remove(requestNumber);
+ } else {
+ logger.warn(
+ "Couldn't find async handler for RPC response with request number "
+ + requestNumber
+ + " "
+ + response.asString());
+ }
+ }
+
+ }
+
+ @Override
+ public void streamClosed() {
+ this.closed = true;
+
+ streams.forEach((key, streamHandler) -> {
+ streamHandler.onStreamError(new ConnectionClosedException());
+ });
+
+ streams.clear();
+
+ awaitingAsyncResponse.forEach((key, value) -> {
+ if (!value.isDone()) {
+ value.completeExceptionally(new ConnectionClosedException());
+ }
+
+ });
+
+
+ }
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java
new file mode 100644
index 0000000..d108663
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc.mux;
+
+import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
+
+/**
+ * Handles incoming items from a result stream
+ */
+public interface ScuttlebuttStreamHandler {
+
+ /**
+ * Handles a new message from the result stream.
+ *
+ * @param message
+ */
+ void onMessage(RPCMessage message);
+
+ /**
+ * Invoked when the stream has been closed.
+ */
+ void onStreamEnd();
+
+ /**
+ * Invoked when there is an error in the stream.
+ *
+ * @param ex the underlying error
+ */
+ void onStreamError(Exception ex);
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java
new file mode 100644
index 0000000..160946c
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc.mux.exceptions;
+
+public class ConnectionClosedException extends Exception {
+
+ public ConnectionClosedException() {
+ super("Connection is closed.");
+ }
+
+}
diff --git a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java
index 63b64a8..3178fae 100644
--- a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java
+++ b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java
@@ -188,4 +188,5 @@ class PatchworkIntegrationTest {
secureScuttlebuttVertxClient.stop().join();
}
+
}
diff --git a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/RPCEncodingTest.java b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/RPCEncodingTest.java
index 10966cf..912c1b5 100644
--- a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/RPCEncodingTest.java
+++ b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/RPCEncodingTest.java
@@ -21,6 +21,7 @@ import org.apache.tuweni.bytes.Bytes;
import java.nio.charset.StandardCharsets;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
class RPCEncodingTest {
@@ -44,7 +45,7 @@ class RPCEncodingTest {
RPCFlag.Stream.STREAM);
RPCMessage decoded = new RPCMessage(message);
assertTrue(decoded.stream());
- assertEquals("some JSON string", decoded.asJSON(String.class));
+ assertEquals("some JSON string", decoded.asJSON(new ObjectMapper(), String.class));
assertEquals(RPCFlag.BodyType.JSON, decoded.bodyType());
assertEquals(RPCCodec.counter.get() - 1, decoded.requestNumber());
}
diff --git a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java
new file mode 100644
index 0000000..293ac92
--- /dev/null
+++ b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java
@@ -0,0 +1,249 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package net.consensys.cava.scuttlebutt.rpc.mux;
+
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import net.consensys.cava.bytes.Bytes;
+import net.consensys.cava.bytes.Bytes32;
+import net.consensys.cava.concurrent.AsyncResult;
+import net.consensys.cava.concurrent.CompletableAsyncResult;
+import net.consensys.cava.crypto.sodium.Signature;
+import net.consensys.cava.io.Base64;
+import net.consensys.cava.junit.VertxExtension;
+import net.consensys.cava.junit.VertxInstance;
+import net.consensys.cava.scuttlebutt.handshake.vertx.SecureScuttlebuttVertxClient;
+import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
+import net.consensys.cava.scuttlebutt.rpc.RPCFunction;
+import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
+import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
+import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import io.vertx.core.Vertx;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.logl.Level;
+import org.logl.LoggerProvider;
+import org.logl.logl.SimpleLogger;
+import org.logl.vertx.LoglLogDelegateFactory;
+
+@ExtendWith(VertxExtension.class)
+public class PatchworkIntegrationTest {
+
+ LoggerProvider loggerProvider = SimpleLogger.withLogLevel(Level.DEBUG).toPrintWriter(
+ new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.out, UTF_8))));
+
+ @Test
+ @Disabled
+ public void testWithPatchwork(@VertxInstance Vertx vertx) throws Exception {
+
+ RPCHandler rpcHandler = makeRPCHandler(vertx);
+
+ List<AsyncResult<RPCMessage>> results = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ RPCFunction function = new RPCFunction("whoami");
+ RPCAsyncRequest asyncRequest = new RPCAsyncRequest(function, new ArrayList<>());
+
+ AsyncResult<RPCMessage> res = rpcHandler.makeAsyncRequest(asyncRequest);
+
+ results.add(res);
+ }
+
+ AsyncResult<List<RPCMessage>> allResults = AsyncResult.combine(results);
+ List<RPCMessage> rpcMessages = allResults.get();
+
+ assertEquals(10, rpcMessages.size());
+
+ rpcMessages.forEach(msg -> {
+ assertFalse(msg.lastMessageOrError());
+
+ });
+
+ }
+
+
+ // TODO: Move this to a utility class that all the scuttlebutt modules' tests can use.
+ private Signature.KeyPair getLocalKeys() throws Exception {
+ Optional<String> ssbDir = Optional.fromNullable(System.getenv().get("ssb_dir"));
+ Optional<String> homePath =
+ Optional.fromNullable(System.getProperty("user.home")).transform(home -> home + "/.ssb");
+
+ Optional<String> path = ssbDir.or(homePath);
+
+ if (!path.isPresent()) {
+ throw new Exception("Cannot find ssb directory config value");
+ }
+
+ String secretPath = path.get() + "/secret";
+ File file = new File(secretPath);
+
+ if (!file.exists()) {
+ throw new Exception("Secret file does not exist");
+ }
+
+ Scanner s = new Scanner(file, UTF_8.name());
+ s.useDelimiter("\n");
+
+ ArrayList<String> list = new ArrayList<String>();
+ while (s.hasNext()) {
+ String next = s.next();
+
+ // Filter out the comment lines
+ if (!next.startsWith("#")) {
+ list.add(next);
+ }
+ }
+
+ String secretJSON = String.join("", list);
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ HashMap<String, String> values = mapper.readValue(secretJSON, new TypeReference<Map<String, String>>() {});
+ String pubKey = values.get("public").replace(".ed25519", "");
+ String privateKey = values.get("private").replace(".ed25519", "");
+
+ Bytes pubKeyBytes = Base64.decode(pubKey);
+ Bytes privKeyBytes = Base64.decode(privateKey);
+
+ Signature.PublicKey pub = Signature.PublicKey.fromBytes(pubKeyBytes);
+ Signature.SecretKey secretKey = Signature.SecretKey.fromBytes(privKeyBytes);
+
+ return new Signature.KeyPair(pub, secretKey);
+ }
+
+ @Test
+ @Disabled
+ public void postMessageTest(@VertxInstance Vertx vertx) throws Exception {
+
+ RPCHandler rpcHandler = makeRPCHandler(vertx);
+
+
+ List<AsyncResult<RPCMessage>> results = new ArrayList<>();
+
+ for (int i = 0; i < 20; i++) {
+ // Note: in a real use case, this would more likely be a Java class with these fields
+ HashMap<String, String> params = new HashMap<>();
+ params.put("type", "post");
+ params.put("text", "test test " + i);
+
+ RPCAsyncRequest asyncRequest = new RPCAsyncRequest(new RPCFunction("publish"), Arrays.asList(params));
+
+ AsyncResult<RPCMessage> rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest);
+
+ results.add(rpcMessageAsyncResult);
+
+ }
+
+ List<RPCMessage> rpcMessages = AsyncResult.combine(results).get();
+
+ rpcMessages.forEach(msg -> System.out.println(msg.asString()));
+ }
+
+ private RPCHandler makeRPCHandler(Vertx vertx) throws Exception {
+ Signature.KeyPair keyPair = getLocalKeys();
+ String networkKeyBase64 = "1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s=";
+ Bytes32 networkKeyBytes32 = Bytes32.wrap(Base64.decode(networkKeyBase64));
+
+ String host = "localhost";
+ int port = 8008;
+ LoggerProvider loggerProvider = SimpleLogger.withLogLevel(Level.DEBUG).toPrintWriter(
+ new PrintWriter(new BufferedWriter(new OutputStreamWriter(System.out, UTF_8))));
+ LoglLogDelegateFactory.setProvider(loggerProvider);
+
+ SecureScuttlebuttVertxClient secureScuttlebuttVertxClient =
+ new SecureScuttlebuttVertxClient(loggerProvider, vertx, keyPair, networkKeyBytes32);
+
+ AsyncResult<RPCHandler> onConnect =
+ secureScuttlebuttVertxClient.connectTo(port, host, keyPair.publicKey(), (sender, terminationFn) -> {
+
+ return new RPCHandler(sender, terminationFn, new ObjectMapper(), loggerProvider);
+ });
+
+ return onConnect.get();
+ }
+
+
+ @Test
+ @Disabled
+ public void streamTest(@VertxInstance Vertx vertx) throws Exception {
+
+ RPCHandler handler = makeRPCHandler(vertx);
+ Signature.PublicKey publicKey = getLocalKeys().publicKey();
+
+ String pubKey = "@" + Base64.encode(publicKey.bytes()) + ".ed25519";
+
+ Map<String, String> params = new HashMap<>();
+ params.put("id", pubKey);
+
+ CompletableAsyncResult<Void> streamEnded = AsyncResult.incomplete();
+
+ RPCStreamRequest streamRequest = new RPCStreamRequest(new RPCFunction("createUserStream"), Arrays.asList(params));
+
+ try {
+ handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() {
+ @Override
+ public void onMessage(RPCMessage message) {
+ System.out.print(message.asString());
+ }
+
+ @Override
+ public void onStreamEnd() {
+ streamEnded.complete(null);
+ }
+
+ @Override
+ public void onStreamError(Exception ex) {
+
+ }
+ });
+ } catch (ConnectionClosedException e) {
+ throw e;
+ }
+
+ // Wait until the stream is complete
+ streamEnded.get();
+
+ }
+
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org