You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/04/23 23:09:13 UTC
[incubator-tuweni] 32/43: Bring over latest changes on scuttlebutt
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit ec647af92a1ba8537b2723e7f3d0b690eb1dc2d3
Author: Antoine Toulme <to...@apache.org>
AuthorDate: Mon Apr 22 14:11:23 2019 -0700
Bring over latest changes on scuttlebutt
---
scuttlebutt-rpc/build.gradle | 3 +-
.../tuweni/scuttlebutt/rpc/RPCAsyncRequest.java | 12 +-
.../tuweni/scuttlebutt/rpc/RPCErrorBody.java | 10 +-
.../apache/tuweni/scuttlebutt/rpc/RPCFunction.java | 10 +-
.../apache/tuweni/scuttlebutt/rpc/RPCMessage.java | 31 ++-
.../tuweni/scuttlebutt/rpc/RPCRequestBody.java | 12 +-
.../tuweni/scuttlebutt/rpc/RPCRequestType.java | 10 +-
.../apache/tuweni/scuttlebutt/rpc/RPCResponse.java | 80 +++++++
.../tuweni/scuttlebutt/rpc/RPCStreamRequest.java | 12 +-
.../tuweni/scuttlebutt/rpc/mux/Multiplexer.java | 22 +-
.../tuweni/scuttlebutt/rpc/mux/RPCHandler.java | 243 +++++++++++----------
.../rpc/mux/ScuttlebuttStreamHandler.java | 14 +-
.../mux/exceptions/ConnectionClosedException.java | 10 +-
.../mux/exceptions/RPCRequestFailedException.java | 20 ++
.../scuttlebutt/rpc/PatchworkIntegrationTest.java | 8 -
.../rpc/mux/PatchworkIntegrationTest.java | 99 ++++-----
16 files changed, 361 insertions(+), 235 deletions(-)
diff --git a/scuttlebutt-rpc/build.gradle b/scuttlebutt-rpc/build.gradle
index cf71a72..1c04ff4 100644
--- a/scuttlebutt-rpc/build.gradle
+++ b/scuttlebutt-rpc/build.gradle
@@ -1,9 +1,10 @@
-description = 'Scuttlebutt Handshake library'
+description = 'Scuttlebutt RPC library'
dependencies {
compile project(':bytes')
compile project(':concurrent')
compile project(':crypto')
+ compileOnly 'io.vertx:vertx-core'
compile project(':scuttlebutt')
compile project(':scuttlebutt-handshake')
compile 'org.logl:logl-api'
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java
index f4281d7..b94c224 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCAsyncRequest.java
@@ -1,8 +1,8 @@
/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -10,9 +10,9 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package net.consensys.cava.scuttlebutt.rpc;
+package org.apache.tuweni.scuttlebutt.rpc;
-import net.consensys.cava.bytes.Bytes;
+import org.apache.tuweni.bytes.Bytes;
import java.util.List;
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java
index 87fa5b6..bd1e888 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCErrorBody.java
@@ -1,8 +1,8 @@
/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -10,7 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package net.consensys.cava.scuttlebutt.rpc;
+package org.apache.tuweni.scuttlebutt.rpc;
/**
* An RPC message response body which contains an error
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java
index 511a491..cef5071 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCFunction.java
@@ -1,8 +1,8 @@
/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -10,7 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package net.consensys.cava.scuttlebutt.rpc;
+package org.apache.tuweni.scuttlebutt.rpc;
import java.util.ArrayList;
import java.util.List;
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java
index 2f21645..7dade5d 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCMessage.java
@@ -15,11 +15,12 @@ package org.apache.tuweni.scuttlebutt.rpc;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException;
import java.io.IOException;
+import java.util.Optional;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
/**
* Decoded RPC message, making elements of the message available directly.
@@ -104,17 +105,41 @@ public final class RPCMessage {
if (!isErrorMessage()) {
// If the body of the response is 'true' or the error flag isn't set, it's a successful end condition
- return Optional.absent();
+ return Optional.empty();
} else {
try {
return Optional.of(asJSON(objectMapper, RPCErrorBody.class));
} catch (IOException e) {
- return Optional.absent();
+ return Optional.empty();
}
}
}
/**
+ *
+ * @param objectMapper the objectmatter to deserialize the error with.
+ *
+ * @return an exception if this represents an error RPC response, otherwise nothing
+ */
+ public Optional<RPCRequestFailedException> getException(ObjectMapper objectMapper) {
+ if (isErrorMessage()) {
+ Optional<RPCRequestFailedException> exception =
+ getErrorBody(objectMapper).map(errorBody -> new RPCRequestFailedException(errorBody.getMessage()));
+
+ if (!exception.isPresent()) {
+ // If we failed to deserialize into the RPCErrorBody type there may be a bug in the server implementation
+ // which prevented it returning the correct type, so we just print whatever it returned
+ return Optional.of(new RPCRequestFailedException(this.asString()));
+ } else {
+ return exception;
+ }
+
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /**
* Provides the type of the body of the message: a binary message, a UTF-8 string or a JSON message.
*
* @return the type of the body: a binary message, a UTF-8 string or a JSON message
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java
index 70ad4e3..471d5b9 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestBody.java
@@ -1,8 +1,8 @@
/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -10,9 +10,9 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package net.consensys.cava.scuttlebutt.rpc;
+package org.apache.tuweni.scuttlebutt.rpc;
-import net.consensys.cava.bytes.Bytes;
+import org.apache.tuweni.bytes.Bytes;
import java.util.List;
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java
index 9703da4..85a18b2 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCRequestType.java
@@ -1,8 +1,8 @@
/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -10,7 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package net.consensys.cava.scuttlebutt.rpc;
+package org.apache.tuweni.scuttlebutt.rpc;
import com.fasterxml.jackson.annotation.JsonProperty;
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCResponse.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCResponse.java
new file mode 100644
index 0000000..73c6e86
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCResponse.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.scuttlebutt.rpc;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.scuttlebutt.rpc.RPCFlag.BodyType;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * A successful RPC response.
+ */
+public class RPCResponse {
+
+ private final Bytes body;
+ private final BodyType bodyType;
+
+ /**
+ * A successful RPC response.
+ *
+ * @param body the body of the response in bytes
+ * @param bodyType the type of the response (e.g. JSON, UTF-8 or binary.)
+ */
+ public RPCResponse(Bytes body, BodyType bodyType) {
+
+ this.body = body;
+ this.bodyType = bodyType;
+ }
+
+ /**
+ * @return the RPC response body
+ */
+ public Bytes body() {
+ return body;
+ }
+
+ /**
+ * @return The type of the data contained in the body.
+ */
+ public BodyType bodyType() {
+ return bodyType;
+ }
+
+ /**
+ * Provides the body of the message as a UTF-8 string.
+ *
+ * @return the body of the message as a UTF-8 string
+ */
+ public String asString() {
+ return new String(body().toArrayUnsafe(), UTF_8);
+ }
+
+ /**
+ * Provides the body of the message, marshalled as a JSON object.
+ *
+ * @param objectMapper the object mapper to deserialize with
+ * @param clazz the JSON object class
+ * @param <T> the matching JSON object class
+ * @return a new instance of the JSON object class
+ * @throws IOException if an error occurs during marshalling
+ */
+ public <T> T asJSON(ObjectMapper objectMapper, Class<T> clazz) throws IOException {
+ return objectMapper.readerFor(clazz).readValue(body().toArrayUnsafe());
+ }
+
+}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java
index ff198da..1249182 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/RPCStreamRequest.java
@@ -1,8 +1,8 @@
/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -10,9 +10,9 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package net.consensys.cava.scuttlebutt.rpc;
+package org.apache.tuweni.scuttlebutt.rpc;
-import net.consensys.cava.bytes.Bytes;
+import org.apache.tuweni.bytes.Bytes;
import java.util.List;
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java
index 722dcbf..66b7d74 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/Multiplexer.java
@@ -1,8 +1,8 @@
/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -10,13 +10,13 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package net.consensys.cava.scuttlebutt.rpc.mux;
+package org.apache.tuweni.scuttlebutt.rpc.mux;
-import net.consensys.cava.concurrent.AsyncResult;
-import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
-import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
-import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
-import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+import org.apache.tuweni.concurrent.AsyncResult;
+import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest;
+import org.apache.tuweni.scuttlebutt.rpc.RPCResponse;
+import org.apache.tuweni.scuttlebutt.rpc.RPCStreamRequest;
+import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
import java.util.function.Function;
@@ -35,7 +35,7 @@ public interface Multiplexer {
*
* @return an async result which will be completed with the result or an error if the request fails.
*/
- AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request);
+ AsyncResult<RPCResponse> makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException;
/**
* Creates a request which opens a stream (e.g. a 'source' in the protocol docs.)
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
index 0ac4583..4655862 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/RPCHandler.java
@@ -1,8 +1,8 @@
/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -10,41 +10,36 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package net.consensys.cava.scuttlebutt.rpc.mux;
-
-import net.consensys.cava.bytes.Bytes;
-import net.consensys.cava.concurrent.AsyncResult;
-import net.consensys.cava.concurrent.CompletableAsyncResult;
-import net.consensys.cava.scuttlebutt.handshake.vertx.ClientHandler;
-import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
-import net.consensys.cava.scuttlebutt.rpc.RPCCodec;
-import net.consensys.cava.scuttlebutt.rpc.RPCErrorBody;
-import net.consensys.cava.scuttlebutt.rpc.RPCFlag;
-import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
-import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
-import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+package org.apache.tuweni.scuttlebutt.rpc.mux;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.concurrent.AsyncResult;
+import org.apache.tuweni.concurrent.CompletableAsyncResult;
+import org.apache.tuweni.scuttlebutt.handshake.vertx.ClientHandler;
+import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest;
+import org.apache.tuweni.scuttlebutt.rpc.RPCCodec;
+import org.apache.tuweni.scuttlebutt.rpc.RPCFlag;
+import org.apache.tuweni.scuttlebutt.rpc.RPCMessage;
+import org.apache.tuweni.scuttlebutt.rpc.RPCResponse;
+import org.apache.tuweni.scuttlebutt.rpc.RPCStreamRequest;
+import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+import org.apache.tuweni.scuttlebutt.rpc.mux.exceptions.RPCRequestFailedException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
import org.logl.Logger;
import org.logl.LoggerProvider;
/**
- * Handles RPC requests and responses from an active connection to a scuttlebutt node
- *
- * Note: the public methods on this class are synchronized so that a request is rejected if the connection has been
- * closed before it begins and any 'in flight' requests are ended exceptionally with a 'connection closed' error without
- * new incoming requests being added to the maps by threads.
- *
- * In the future,we could perhaps be carefully more fine grained about the locking if we require a high degree of
- * concurrency.
- *
+ * Handles RPC requests and responses from an active connection to a scuttlebutt node.
*/
public class RPCHandler implements Multiplexer, ClientHandler {
@@ -53,7 +48,13 @@ public class RPCHandler implements Multiplexer, ClientHandler {
private final Runnable connectionCloser;
private final ObjectMapper objectMapper;
- private Map<Integer, CompletableAsyncResult<RPCMessage>> awaitingAsyncResponse = new HashMap<>();
+ /**
+ * We run each each update on the vertx event loop to update the request state synchronously, and to handle the
+ * underlying connection closing by failing the in progress requests and not accepting future requests
+ */
+ private final Vertx vertx;
+
+ private Map<Integer, CompletableAsyncResult<RPCResponse>> awaitingAsyncResponse = new HashMap<>();
private Map<Integer, ScuttlebuttStreamHandler> streams = new HashMap<>();
private boolean closed;
@@ -61,16 +62,19 @@ public class RPCHandler implements Multiplexer, ClientHandler {
/**
* Makes RPC requests over a connection
*
+ * @param vertx The vertx instance to queue requests with
* @param messageSender sends the request to the node
* @param terminationFn closes the connection
* @param objectMapper the objectMapper to serialize and deserialize message request and response bodies
* @param logger
*/
public RPCHandler(
+ Vertx vertx,
Consumer<Bytes> messageSender,
Runnable terminationFn,
ObjectMapper objectMapper,
LoggerProvider logger) {
+ this.vertx = vertx;
this.messageSender = messageSender;
this.connectionCloser = terminationFn;
this.closed = false;
@@ -80,86 +84,116 @@ public class RPCHandler implements Multiplexer, ClientHandler {
}
@Override
- public synchronized AsyncResult<RPCMessage> makeAsyncRequest(RPCAsyncRequest request) {
+ public AsyncResult<RPCResponse> makeAsyncRequest(RPCAsyncRequest request) throws JsonProcessingException {
- CompletableAsyncResult<RPCMessage> result = AsyncResult.incomplete();
+ Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper);
- if (closed) {
- result.completeExceptionally(new ConnectionClosedException());
- }
+ CompletableAsyncResult<RPCResponse> result = AsyncResult.incomplete();
- try {
- RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper));
- int requestNumber = message.requestNumber();
- awaitingAsyncResponse.put(requestNumber, result);
- Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags());
- messageSender.accept(bytes);
+ Handler<Void> synchronizedAddRequest = (event) -> {
+ if (closed) {
+ result.completeExceptionally(new ConnectionClosedException());
+ } else {
+ RPCMessage message = new RPCMessage(bodyBytes);
+ int requestNumber = message.requestNumber();
- } catch (JsonProcessingException e) {
- result.completeExceptionally(e);
- }
+ awaitingAsyncResponse.put(requestNumber, result);
+ Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, request.getRPCFlags());
+ sendBytes(bytes);
+ }
+ };
+ vertx.runOnContext(synchronizedAddRequest);
return result;
}
@Override
- public synchronized void openStream(
- RPCStreamRequest request,
- Function<Runnable, ScuttlebuttStreamHandler> responseSink) throws JsonProcessingException,
- ConnectionClosedException {
+ public void openStream(RPCStreamRequest request, Function<Runnable, ScuttlebuttStreamHandler> responseSink)
+ throws JsonProcessingException {
- if (closed) {
- throw new ConnectionClosedException();
- }
+ Bytes bodyBytes = request.toEncodedRpcMessage(objectMapper);
+
+ Handler<Void> synchronizedRequest = (event) -> {
- try {
RPCFlag[] rpcFlags = request.getRPCFlags();
- RPCMessage message = new RPCMessage(request.toEncodedRpcMessage(objectMapper));
+ RPCMessage message = new RPCMessage(bodyBytes);
int requestNumber = message.requestNumber();
- Bytes bytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags);
- messageSender.accept(bytes);
-
- Runnable closeStreamHandler = new Runnable() {
- @Override
- public void run() {
+ Bytes requestBytes = RPCCodec.encodeRequest(message.body(), requestNumber, rpcFlags);
- try {
- Bytes bytes = RPCCodec.encodeStreamEndRequest(requestNumber);
- messageSender.accept(bytes);
- } catch (JsonProcessingException e) {
- logger.warn("Unexpectedly could not encode stream end message to JSON.");
- }
+ Runnable closeStreamHandler = () -> {
+ try {
+ Bytes streamEnd = RPCCodec.encodeStreamEndRequest(requestNumber);
+ sendBytes(streamEnd);
+ } catch (JsonProcessingException e) {
+ logger.warn("Unexpectedly could not encode stream end message to JSON.");
}
+
};
ScuttlebuttStreamHandler scuttlebuttStreamHandler = responseSink.apply(closeStreamHandler);
- streams.put(requestNumber, scuttlebuttStreamHandler);
- } catch (JsonProcessingException ex) {
- throw ex;
- }
+ if (closed) {
+ scuttlebuttStreamHandler.onStreamError(new ConnectionClosedException());
+ } else {
+ streams.put(requestNumber, scuttlebuttStreamHandler);
+ sendBytes(requestBytes);
+ }
+
+
+ };
+
+ vertx.runOnContext(synchronizedRequest);
}
@Override
- public synchronized void close() {
- connectionCloser.run();
+ public void close() {
+ vertx.runOnContext((event) -> {
+ connectionCloser.run();
+ });
}
@Override
- public synchronized void receivedMessage(Bytes message) {
+ public void receivedMessage(Bytes message) {
- RPCMessage rpcMessage = new RPCMessage(message);
+ Handler<Void> synchronizedHandleMessage = (event) -> {
+ RPCMessage rpcMessage = new RPCMessage(message);
- // A negative request number indicates that this is a response, rather than a request that this node
- // should service
- if (rpcMessage.requestNumber() < 0) {
- handleResponse(rpcMessage);
- } else {
- handleRequest(rpcMessage);
- }
+ // A negative request number indicates that this is a response, rather than a request that this node
+ // should service
+ if (rpcMessage.requestNumber() < 0) {
+ handleResponse(rpcMessage);
+ } else {
+ handleRequest(rpcMessage);
+ }
+ };
+ vertx.runOnContext(synchronizedHandleMessage);
+ }
+
+ @Override
+ public void streamClosed() {
+
+ Handler<Void> synchronizedCloseStream = (event) -> {
+ closed = true;
+
+ streams.forEach((key, streamHandler) -> {
+ streamHandler.onStreamError(new ConnectionClosedException());
+ });
+
+ streams.clear();
+
+ awaitingAsyncResponse.forEach((key, value) -> {
+ if (!value.isDone()) {
+ value.completeExceptionally(new ConnectionClosedException());
+ }
+ });
+
+ awaitingAsyncResponse.clear();
+ };
+
+ vertx.runOnContext(synchronizedCloseStream);
}
private void handleRequest(RPCMessage rpcMessage) {
@@ -179,6 +213,8 @@ public class RPCHandler implements Multiplexer, ClientHandler {
boolean isStream = RPCFlag.Stream.STREAM.isApplied(rpcFlags);
+ Optional<RPCRequestFailedException> exception = response.getException(objectMapper);
+
if (isStream) {
ScuttlebuttStreamHandler scuttlebuttStreamHandler = streams.get(requestNumber);
@@ -187,20 +223,11 @@ public class RPCHandler implements Multiplexer, ClientHandler {
if (response.isSuccessfulLastMessage()) {
streams.remove(requestNumber);
scuttlebuttStreamHandler.onStreamEnd();
- } else if (response.isErrorMessage()) {
-
- Optional<RPCErrorBody> errorBody = response.getErrorBody(objectMapper);
-
- if (errorBody.isPresent()) {
- scuttlebuttStreamHandler.onStreamError(new Exception(errorBody.get().getMessage()));
- } else {
- // This shouldn't happen, but for safety we fall back to just writing the whole body in the exception message
- // if we fail to marshall it for whatever reason
- scuttlebuttStreamHandler.onStreamError(new Exception(response.asString()));
- }
-
+ } else if (exception.isPresent()) {
+ scuttlebuttStreamHandler.onStreamError(exception.get());
} else {
- scuttlebuttStreamHandler.onMessage(response);
+ RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType());
+ scuttlebuttStreamHandler.onMessage(successfulResponse);
}
} else {
logger.warn(
@@ -212,11 +239,18 @@ public class RPCHandler implements Multiplexer, ClientHandler {
} else {
- CompletableAsyncResult<RPCMessage> rpcMessageFuture = awaitingAsyncResponse.get(requestNumber);
+ CompletableAsyncResult<RPCResponse> rpcMessageFuture = awaitingAsyncResponse.remove(requestNumber);
if (rpcMessageFuture != null) {
- rpcMessageFuture.complete(response);
- awaitingAsyncResponse.remove(requestNumber);
+
+ if (exception.isPresent()) {
+ rpcMessageFuture.completeExceptionally(exception.get());
+ } else {
+ RPCResponse successfulResponse = new RPCResponse(response.body(), response.bodyType());
+
+ rpcMessageFuture.complete(successfulResponse);
+ }
+
} else {
logger.warn(
"Couldn't find async handler for RPC response with request number "
@@ -228,23 +262,8 @@ public class RPCHandler implements Multiplexer, ClientHandler {
}
- @Override
- public void streamClosed() {
- this.closed = true;
-
- streams.forEach((key, streamHandler) -> {
- streamHandler.onStreamError(new ConnectionClosedException());
- });
-
- streams.clear();
-
- awaitingAsyncResponse.forEach((key, value) -> {
- if (!value.isDone()) {
- value.completeExceptionally(new ConnectionClosedException());
- }
-
- });
-
-
+ private void sendBytes(Bytes bytes) {
+ messageSender.accept(bytes);
}
+
}
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java
index d108663..d64b54a 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/ScuttlebuttStreamHandler.java
@@ -1,8 +1,8 @@
/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -10,9 +10,9 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package net.consensys.cava.scuttlebutt.rpc.mux;
+package org.apache.tuweni.scuttlebutt.rpc.mux;
-import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
+import org.apache.tuweni.scuttlebutt.rpc.RPCResponse;
/**
* Handles incoming items from a result stream
@@ -24,7 +24,7 @@ public interface ScuttlebuttStreamHandler {
*
* @param message
*/
- void onMessage(RPCMessage message);
+ void onMessage(RPCResponse message);
/**
* Invoked when the stream has been closed.
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java
index 160946c..a3c269c 100644
--- a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/ConnectionClosedException.java
@@ -1,8 +1,8 @@
/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -10,7 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package net.consensys.cava.scuttlebutt.rpc.mux.exceptions;
+package org.apache.tuweni.scuttlebutt.rpc.mux.exceptions;
public class ConnectionClosedException extends Exception {
diff --git a/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java
new file mode 100644
index 0000000..5baefca
--- /dev/null
+++ b/scuttlebutt-rpc/src/main/java/org/apache/tuweni/scuttlebutt/rpc/mux/exceptions/RPCRequestFailedException.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.scuttlebutt.rpc.mux.exceptions;
+
+public final class RPCRequestFailedException extends RuntimeException {
+
+ public RPCRequestFailedException(String errorMessage) {
+ super(errorMessage);
+ }
+}
diff --git a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java
index 3178fae..9d8527d 100644
--- a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java
+++ b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/PatchworkIntegrationTest.java
@@ -16,13 +16,11 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.concurrent.AsyncResult;
import org.apache.tuweni.crypto.sodium.Signature;
-import org.apache.tuweni.crypto.sodium.Sodium;
import org.apache.tuweni.io.Base64;
import org.apache.tuweni.junit.VertxExtension;
import org.apache.tuweni.junit.VertxInstance;
@@ -43,7 +41,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import io.vertx.core.Vertx;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -60,11 +57,6 @@ import org.logl.vertx.LoglLogDelegateFactory;
@ExtendWith(VertxExtension.class)
class PatchworkIntegrationTest {
- @BeforeAll
- static void checkAvailable() {
- assumeTrue(Sodium.isAvailable(), "Sodium native library is not available");
- }
-
public static class MyClientHandler implements ClientHandler {
private final Consumer<Bytes> sender;
diff --git a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java
index 293ac92..6581b40 100644
--- a/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java
+++ b/scuttlebutt-rpc/src/test/java/org/apache/tuweni/scuttlebutt/rpc/mux/PatchworkIntegrationTest.java
@@ -1,8 +1,8 @@
/*
- * Copyright 2019 ConsenSys AG.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -10,7 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package net.consensys.cava.scuttlebutt.rpc.mux;
+package org.apache.tuweni.scuttlebutt.rpc.mux;
/*
* Copyright 2019 ConsenSys AG.
@@ -26,22 +26,20 @@ package net.consensys.cava.scuttlebutt.rpc.mux;
*/
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-
-import net.consensys.cava.bytes.Bytes;
-import net.consensys.cava.bytes.Bytes32;
-import net.consensys.cava.concurrent.AsyncResult;
-import net.consensys.cava.concurrent.CompletableAsyncResult;
-import net.consensys.cava.crypto.sodium.Signature;
-import net.consensys.cava.io.Base64;
-import net.consensys.cava.junit.VertxExtension;
-import net.consensys.cava.junit.VertxInstance;
-import net.consensys.cava.scuttlebutt.handshake.vertx.SecureScuttlebuttVertxClient;
-import net.consensys.cava.scuttlebutt.rpc.RPCAsyncRequest;
-import net.consensys.cava.scuttlebutt.rpc.RPCFunction;
-import net.consensys.cava.scuttlebutt.rpc.RPCMessage;
-import net.consensys.cava.scuttlebutt.rpc.RPCStreamRequest;
-import net.consensys.cava.scuttlebutt.rpc.mux.exceptions.ConnectionClosedException;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.bytes.Bytes32;
+import org.apache.tuweni.concurrent.AsyncResult;
+import org.apache.tuweni.concurrent.CompletableAsyncResult;
+import org.apache.tuweni.crypto.sodium.Signature;
+import org.apache.tuweni.io.Base64;
+import org.apache.tuweni.junit.VertxExtension;
+import org.apache.tuweni.junit.VertxInstance;
+import org.apache.tuweni.scuttlebutt.handshake.vertx.SecureScuttlebuttVertxClient;
+import org.apache.tuweni.scuttlebutt.rpc.RPCAsyncRequest;
+import org.apache.tuweni.scuttlebutt.rpc.RPCFunction;
+import org.apache.tuweni.scuttlebutt.rpc.RPCResponse;
+import org.apache.tuweni.scuttlebutt.rpc.RPCStreamRequest;
import java.io.BufferedWriter;
import java.io.File;
@@ -78,27 +76,21 @@ public class PatchworkIntegrationTest {
RPCHandler rpcHandler = makeRPCHandler(vertx);
- List<AsyncResult<RPCMessage>> results = new ArrayList<>();
+ List<AsyncResult<RPCResponse>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
RPCFunction function = new RPCFunction("whoami");
RPCAsyncRequest asyncRequest = new RPCAsyncRequest(function, new ArrayList<>());
- AsyncResult<RPCMessage> res = rpcHandler.makeAsyncRequest(asyncRequest);
+ AsyncResult<RPCResponse> res = rpcHandler.makeAsyncRequest(asyncRequest);
results.add(res);
}
- AsyncResult<List<RPCMessage>> allResults = AsyncResult.combine(results);
- List<RPCMessage> rpcMessages = allResults.get();
+ AsyncResult<List<RPCResponse>> allResults = AsyncResult.combine(results);
+ List<RPCResponse> rpcMessages = allResults.get();
assertEquals(10, rpcMessages.size());
-
- rpcMessages.forEach(msg -> {
- assertFalse(msg.lastMessageOrError());
-
- });
-
}
@@ -158,7 +150,7 @@ public class PatchworkIntegrationTest {
RPCHandler rpcHandler = makeRPCHandler(vertx);
- List<AsyncResult<RPCMessage>> results = new ArrayList<>();
+ List<AsyncResult<RPCResponse>> results = new ArrayList<>();
for (int i = 0; i < 20; i++) {
// Note: in a real use case, this would more likely be a Java class with these fields
@@ -168,13 +160,13 @@ public class PatchworkIntegrationTest {
RPCAsyncRequest asyncRequest = new RPCAsyncRequest(new RPCFunction("publish"), Arrays.asList(params));
- AsyncResult<RPCMessage> rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest);
+ AsyncResult<RPCResponse> rpcMessageAsyncResult = rpcHandler.makeAsyncRequest(asyncRequest);
results.add(rpcMessageAsyncResult);
}
- List<RPCMessage> rpcMessages = AsyncResult.combine(results).get();
+ List<RPCResponse> rpcMessages = AsyncResult.combine(results).get();
rpcMessages.forEach(msg -> System.out.println(msg.asString()));
}
@@ -196,7 +188,7 @@ public class PatchworkIntegrationTest {
AsyncResult<RPCHandler> onConnect =
secureScuttlebuttVertxClient.connectTo(port, host, keyPair.publicKey(), (sender, terminationFn) -> {
- return new RPCHandler(sender, terminationFn, new ObjectMapper(), loggerProvider);
+ return new RPCHandler(vertx, sender, terminationFn, new ObjectMapper(), loggerProvider);
});
return onConnect.get();
@@ -219,26 +211,23 @@ public class PatchworkIntegrationTest {
RPCStreamRequest streamRequest = new RPCStreamRequest(new RPCFunction("createUserStream"), Arrays.asList(params));
- try {
- handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() {
- @Override
- public void onMessage(RPCMessage message) {
- System.out.print(message.asString());
- }
-
- @Override
- public void onStreamEnd() {
- streamEnded.complete(null);
- }
-
- @Override
- public void onStreamError(Exception ex) {
-
- }
- });
- } catch (ConnectionClosedException e) {
- throw e;
- }
+ handler.openStream(streamRequest, (closeStream) -> new ScuttlebuttStreamHandler() {
+ @Override
+ public void onMessage(RPCResponse message) {
+ System.out.print(message.asString());
+ }
+
+ @Override
+ public void onStreamEnd() {
+ streamEnded.complete(null);
+ }
+
+ @Override
+ public void onStreamError(Exception ex) {
+
+ streamEnded.completeExceptionally(ex);
+ }
+ });
// Wait until the stream is complete
streamEnded.get();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org