You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/05/23 19:07:16 UTC
[incubator-tuweni] branch master updated: Plumtree and gossip app
feedback
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 29f585a Plumtree and gossip app feedback
29f585a is described below
commit 29f585a760e29895de7f02a71f7e8b006934b865
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Thu May 23 12:07:05 2019 -0700
Plumtree and gossip app feedback
---
gossip/build.gradle | 2 ++
.../java/org/apache/tuweni/gossip/GossipApp.java | 15 +++++++++-
.../tuweni/plumtree/vertx/VertxGossipServer.java | 34 +++++++++++++---------
.../plumtree/vertx/VertxGossipServerTest.java | 3 +-
4 files changed, 38 insertions(+), 16 deletions(-)
diff --git a/gossip/build.gradle b/gossip/build.gradle
index f708004..f0eba00 100644
--- a/gossip/build.gradle
+++ b/gossip/build.gradle
@@ -21,6 +21,8 @@ dependencies {
compile 'info.picocli:picocli'
compile 'io.vertx:vertx-core'
compile 'org.bouncycastle:bcprov-jdk15on'
+ compile 'org.logl:logl-api'
+ compile 'org.logl:logl-logl'
compile project(':bytes')
compile project(':config')
compile project(':plumtree')
diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
index ad7293c..173ec7b 100644
--- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
@@ -27,6 +27,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
+import java.security.Security;
import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.CompletionException;
@@ -39,6 +40,7 @@ import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerRequest;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
import picocli.CommandLine;
/**
@@ -48,6 +50,7 @@ import picocli.CommandLine;
public final class GossipApp {
public static void main(String[] args) {
+ Security.addProvider(new BouncyCastleProvider());
GossipCommandLineOptions opts = CommandLine.populateCommand(new GossipCommandLineOptions(), args);
try {
opts.validate();
@@ -61,10 +64,12 @@ public final class GossipApp {
System.exit(0);
}
GossipApp gossipApp = new GossipApp(Vertx.vertx(), opts, System.err, System.out, () -> System.exit(1));
- Runtime.getRuntime().addShutdownHook(new Thread(() -> gossipApp.stop()));
+ Runtime.getRuntime().addShutdownHook(new Thread(gossipApp::stop));
gossipApp.start();
}
+
+
private final GossipCommandLineOptions opts;
private final Runnable terminateFunction;
private final PrintStream errStream;
@@ -79,6 +84,7 @@ public final class GossipApp {
PrintStream outStream,
Runnable terminateFunction) {
EphemeralPeerRepository repository = new EphemeralPeerRepository();
+ outStream.println("Setting up server on " + opts.networkInterface() + ":" + opts.listenPort());
server = new VertxGossipServer(
vertx,
opts.networkInterface(),
@@ -95,6 +101,7 @@ public final class GossipApp {
}
void start() {
+ outStream.println("Starting gossip");
AsyncCompletion completion = server.start();
try {
completion.join();
@@ -102,6 +109,7 @@ public final class GossipApp {
errStream.println("Server could not start: " + e.getMessage());
terminateFunction.run();
}
+ outStream.println("TCP server started");
CompletableAsyncCompletion rpcCompletion = AsyncCompletion.incomplete();
rpcServer.requestHandler(this::handleRPCRequest).listen(opts.rpcPort(), opts.networkInterface(), res -> {
@@ -117,6 +125,7 @@ public final class GossipApp {
errStream.println("RPC server could not start: " + e.getMessage());
terminateFunction.run();
}
+ outStream.println("RPC server started");
try {
AsyncCompletion
@@ -125,6 +134,7 @@ public final class GossipApp {
} catch (TimeoutException | InterruptedException e) {
errStream.println("Server could not connect to other peers: " + e.getMessage());
}
+ outStream.println("Gossip started");
}
private void handleRPCRequest(HttpServerRequest httpServerRequest) {
@@ -143,6 +153,7 @@ public final class GossipApp {
}
void stop() {
+ outStream.println("Stopping gossip");
try {
server.stop().join();
} catch (InterruptedException e) {
@@ -161,6 +172,7 @@ public final class GossipApp {
try {
rpcCompletion.join();
} catch (CompletionException | InterruptedException e) {
+ outStream.println("Stopped gossip");
errStream.println("RPC server could not stop: " + e.getMessage());
terminateFunction.run();
}
@@ -184,6 +196,7 @@ public final class GossipApp {
}
public void publish(Bytes message) {
+ outStream.println("Message to publish " + message.toHexString());
server.gossip("", message);
}
}
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index 9cb2d38..dce838d 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -187,26 +187,32 @@ public final class VertxGossipServer {
if (!started.get()) {
throw new IllegalStateException("Server has not started");
}
+
CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
AtomicInteger counter = new AtomicInteger(0);
- while (!completion.isDone()) {
- client.connect(port, host, res -> {
- if (res.failed()) {
- if (counter.incrementAndGet() > 5) {
- completion.completeExceptionally(res.cause());
- }
- } else {
- completion.complete();
- Peer peer = new SocketPeer(res.result());
- SocketHandler handler = new SocketHandler(peer);
- res.result().handler(handler::handle).closeHandler(handler::close);
- }
- });
- }
+
+ roundConnect(host, port, counter, completion);
return completion;
}
+ private void roundConnect(String host, int port, AtomicInteger counter, CompletableAsyncCompletion completion) {
+ client.connect(port, host, res -> {
+ if (res.failed()) {
+ if (counter.incrementAndGet() > 5) {
+ completion.completeExceptionally(res.cause());
+ } else {
+ roundConnect(host, port, counter, completion);
+ }
+ } else {
+ Peer peer = new SocketPeer(res.result());
+ SocketHandler handler = new SocketHandler(peer);
+ res.result().handler(handler::handle).closeHandler(handler::close);
+ completion.complete();
+ }
+ });
+ }
+
/**
* Gossip a message to all known peers.
*
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index 19e2fa9..d5dd5d8 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -166,13 +166,14 @@ class VertxGossipServerTest {
server1.connectTo("127.0.0.1", 10001).join();
server2.connectTo("127.0.0.1", 10002).join();
server1.connectTo("127.0.0.1", 10002).join();
+ assertEquals(2, peerRepository1.eagerPushPeers().size());
String attributes = "{\"message_type\": \"BLOCK\"}";
server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
Thread.sleep(1000);
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
Thread.sleep(1000);
- assertTrue(peerRepository1.lazyPushPeers().size() > 1 || peerRepository3.lazyPushPeers().size() > 1);
+ assertTrue(peerRepository1.lazyPushPeers().size() > 0 || peerRepository3.lazyPushPeers().size() > 0);
server1.stop().join();
server2.stop().join();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org