You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/05/23 19:07:16 UTC

[incubator-tuweni] branch master updated: Plumtree and gossip app feedback

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 29f585a  Plumtree and gossip app feedback
29f585a is described below

commit 29f585a760e29895de7f02a71f7e8b006934b865
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Thu May 23 12:07:05 2019 -0700

    Plumtree and gossip app feedback
---
 gossip/build.gradle                                |  2 ++
 .../java/org/apache/tuweni/gossip/GossipApp.java   | 15 +++++++++-
 .../tuweni/plumtree/vertx/VertxGossipServer.java   | 34 +++++++++++++---------
 .../plumtree/vertx/VertxGossipServerTest.java      |  3 +-
 4 files changed, 38 insertions(+), 16 deletions(-)

diff --git a/gossip/build.gradle b/gossip/build.gradle
index f708004..f0eba00 100644
--- a/gossip/build.gradle
+++ b/gossip/build.gradle
@@ -21,6 +21,8 @@ dependencies {
   compile 'info.picocli:picocli'
   compile 'io.vertx:vertx-core'
   compile 'org.bouncycastle:bcprov-jdk15on'
+  compile 'org.logl:logl-api'
+  compile 'org.logl:logl-logl'
   compile project(':bytes')
   compile project(':config')
   compile project(':plumtree')
diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
index ad7293c..173ec7b 100644
--- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
@@ -27,6 +27,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
+import java.security.Security;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.concurrent.CompletionException;
@@ -39,6 +40,7 @@ import io.vertx.core.Vertx;
 import io.vertx.core.http.HttpMethod;
 import io.vertx.core.http.HttpServer;
 import io.vertx.core.http.HttpServerRequest;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
 import picocli.CommandLine;
 
 /**
@@ -48,6 +50,7 @@ import picocli.CommandLine;
 public final class GossipApp {
 
   public static void main(String[] args) {
+    Security.addProvider(new BouncyCastleProvider());
     GossipCommandLineOptions opts = CommandLine.populateCommand(new GossipCommandLineOptions(), args);
     try {
       opts.validate();
@@ -61,10 +64,12 @@ public final class GossipApp {
       System.exit(0);
     }
     GossipApp gossipApp = new GossipApp(Vertx.vertx(), opts, System.err, System.out, () -> System.exit(1));
-    Runtime.getRuntime().addShutdownHook(new Thread(() -> gossipApp.stop()));
+    Runtime.getRuntime().addShutdownHook(new Thread(gossipApp::stop));
     gossipApp.start();
   }
 
+
+
   private final GossipCommandLineOptions opts;
   private final Runnable terminateFunction;
   private final PrintStream errStream;
@@ -79,6 +84,7 @@ public final class GossipApp {
       PrintStream outStream,
       Runnable terminateFunction) {
     EphemeralPeerRepository repository = new EphemeralPeerRepository();
+    outStream.println("Setting up server on " + opts.networkInterface() + ":" + opts.listenPort());
     server = new VertxGossipServer(
         vertx,
         opts.networkInterface(),
@@ -95,6 +101,7 @@ public final class GossipApp {
   }
 
   void start() {
+    outStream.println("Starting gossip");
     AsyncCompletion completion = server.start();
     try {
       completion.join();
@@ -102,6 +109,7 @@ public final class GossipApp {
       errStream.println("Server could not start: " + e.getMessage());
       terminateFunction.run();
     }
+    outStream.println("TCP server started");
 
     CompletableAsyncCompletion rpcCompletion = AsyncCompletion.incomplete();
     rpcServer.requestHandler(this::handleRPCRequest).listen(opts.rpcPort(), opts.networkInterface(), res -> {
@@ -117,6 +125,7 @@ public final class GossipApp {
       errStream.println("RPC server could not start: " + e.getMessage());
       terminateFunction.run();
     }
+    outStream.println("RPC server started");
 
     try {
       AsyncCompletion
@@ -125,6 +134,7 @@ public final class GossipApp {
     } catch (TimeoutException | InterruptedException e) {
       errStream.println("Server could not connect to other peers: " + e.getMessage());
     }
+    outStream.println("Gossip started");
   }
 
   private void handleRPCRequest(HttpServerRequest httpServerRequest) {
@@ -143,6 +153,7 @@ public final class GossipApp {
   }
 
   void stop() {
+    outStream.println("Stopping gossip");
     try {
       server.stop().join();
     } catch (InterruptedException e) {
@@ -161,6 +172,7 @@ public final class GossipApp {
     try {
       rpcCompletion.join();
     } catch (CompletionException | InterruptedException e) {
+      outStream.println("Stopped gossip");
       errStream.println("RPC server could not stop: " + e.getMessage());
       terminateFunction.run();
     }
@@ -184,6 +196,7 @@ public final class GossipApp {
   }
 
   public void publish(Bytes message) {
+    outStream.println("Message to publish " + message.toHexString());
     server.gossip("", message);
   }
 }
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index 9cb2d38..dce838d 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -187,26 +187,32 @@ public final class VertxGossipServer {
     if (!started.get()) {
       throw new IllegalStateException("Server has not started");
     }
+
     CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
     AtomicInteger counter = new AtomicInteger(0);
-    while (!completion.isDone()) {
-      client.connect(port, host, res -> {
-        if (res.failed()) {
-          if (counter.incrementAndGet() > 5) {
-            completion.completeExceptionally(res.cause());
-          }
-        } else {
-          completion.complete();
-          Peer peer = new SocketPeer(res.result());
-          SocketHandler handler = new SocketHandler(peer);
-          res.result().handler(handler::handle).closeHandler(handler::close);
-        }
-      });
-    }
+
+    roundConnect(host, port, counter, completion);
 
     return completion;
   }
 
+  private void roundConnect(String host, int port, AtomicInteger counter, CompletableAsyncCompletion completion) {
+    client.connect(port, host, res -> {
+      if (res.failed()) {
+        if (counter.incrementAndGet() > 5) {
+          completion.completeExceptionally(res.cause());
+        } else {
+          roundConnect(host, port, counter, completion);
+        }
+      } else {
+        Peer peer = new SocketPeer(res.result());
+        SocketHandler handler = new SocketHandler(peer);
+        res.result().handler(handler::handle).closeHandler(handler::close);
+        completion.complete();
+      }
+    });
+  }
+
   /**
    * Gossip a message to all known peers.
    * 
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index 19e2fa9..d5dd5d8 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -166,13 +166,14 @@ class VertxGossipServerTest {
     server1.connectTo("127.0.0.1", 10001).join();
     server2.connectTo("127.0.0.1", 10002).join();
     server1.connectTo("127.0.0.1", 10002).join();
+    assertEquals(2, peerRepository1.eagerPushPeers().size());
     String attributes = "{\"message_type\": \"BLOCK\"}";
     server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
     Thread.sleep(1000);
     assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
     Thread.sleep(1000);
 
-    assertTrue(peerRepository1.lazyPushPeers().size() > 1 || peerRepository3.lazyPushPeers().size() > 1);
+    assertTrue(peerRepository1.lazyPushPeers().size() > 0 || peerRepository3.lazyPushPeers().size() > 0);
 
     server1.stop().join();
     server2.stop().join();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org