You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/04/26 00:23:51 UTC

[incubator-tuweni] 13/48: Add a sample integration example to plumtree

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git

commit 1da45edfe6114b0632690e00e1f4728c45572944
Author: Antoine Toulme <to...@apache.org>
AuthorDate: Wed Apr 3 16:58:53 2019 -0700

    Add a sample integration example to plumtree
---
 plumtree/build.gradle                              |   4 +
 .../{StateActor.java => MessageValidator.java}     |  18 +-
 .../main/java/org/apache/tuweni/plumtree/Peer.java |   5 +-
 .../java/org/apache/tuweni/plumtree/State.java     | 115 +++++++++---
 .../SocketPeer.java}                               |  26 ++-
 .../tuweni/plumtree/vertx/VertxGossipServer.java   | 208 +++++++++++++++++++++
 .../java/org/apache/tuweni/plumtree/StateTest.java |  92 ++++-----
 .../plumtree/vertx/VertxGossipServerTest.java      | 165 ++++++++++++++++
 8 files changed, 562 insertions(+), 71 deletions(-)

diff --git a/plumtree/build.gradle b/plumtree/build.gradle
index 9d6569f..041b8bb 100644
--- a/plumtree/build.gradle
+++ b/plumtree/build.gradle
@@ -2,9 +2,13 @@ description = 'Plumtree - Push-Lazy-pUsh Multicast TREE, an implementation of Ep
 
 dependencies {
   compile project(':bytes')
+  compile project(':concurrent')
   compile project(':crypto')
 
+  compileOnly 'io.vertx:vertx-core'
+
   testCompile project(':junit')
+  testCompile 'io.vertx:vertx-core'
   testCompile 'org.bouncycastle:bcprov-jdk15on'
   testCompile 'org.junit.jupiter:junit-jupiter-api'
   testCompile 'org.junit.jupiter:junit-jupiter-params'
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/StateActor.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageValidator.java
similarity index 63%
rename from plumtree/src/main/java/org/apache/tuweni/plumtree/StateActor.java
rename to plumtree/src/main/java/org/apache/tuweni/plumtree/MessageValidator.java
index dc5029d..d17132d 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/StateActor.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageValidator.java
@@ -12,5 +12,21 @@
  */
 package org.apache.tuweni.plumtree;
 
-public interface StateActor {
+import org.apache.tuweni.bytes.Bytes;
+
+/**
+ * Validator for a message and a peer.
+ *
+ * This validator is called prior to gossiping the message from that peer to other peers.
+ */
+public interface MessageValidator {
+
+  /**
+   * Validates that the message from the peer is valid.
+   *
+   * @param message the payload sent over the network
+   * @param peer the peer that sent the message
+   * @return true if the message is valid
+   */
+  boolean validate(Bytes message, Peer peer);
 }
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/Peer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/Peer.java
index bf42d86..10e87b3 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/Peer.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/Peer.java
@@ -12,5 +12,8 @@
  */
 package org.apache.tuweni.plumtree;
 
-public class Peer {
+/**
+ * A peer part of the gossip system.
+ */
+public interface Peer {
 }
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
index 36ecd37..f35f358 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
@@ -15,15 +15,18 @@ package org.apache.tuweni.plumtree;
 import org.apache.tuweni.bytes.Bytes;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 
 /**
  * Local state to our peer, representing the make-up of the tree of peers.
@@ -32,8 +35,19 @@ public final class State {
 
   private final PeerRepository peerRepository;
   private final MessageHashing messageHashingFunction;
-  private final Map<Bytes, MessageHandler> messageHandlers = new ConcurrentHashMap<>();
+  private final int maxMessagesHandlers = 1000000;
+  private final Map<Bytes, MessageHandler> messageHandlers =
+      Collections.synchronizedMap(new LinkedHashMap<Bytes, MessageHandler>() {
+
+        @Override
+        protected boolean removeEldestEntry(final Map.Entry<Bytes, MessageHandler> eldest) {
+          return super.size() > maxMessagesHandlers;
+        }
+      });
+
   private final MessageSender messageSender;
+  private final Consumer<Bytes> messageListener;
+  private final MessageValidator messageValidator;
   private final Queue<Runnable> lazyQueue = new ConcurrentLinkedQueue<>();
   private final Timer timer = new Timer("plumtree", true);
   private final long delay;
@@ -41,6 +55,7 @@ public final class State {
   final class MessageHandler {
 
     private final Bytes hash;
+
     private final AtomicBoolean receivedFullMessage = new AtomicBoolean(false);
     private final AtomicBoolean requestingGraftMessage = new AtomicBoolean(false);
     private List<TimerTask> tasks = new ArrayList<>();
@@ -50,26 +65,38 @@ public final class State {
       this.hash = hash;
     }
 
-    void fullMessageReceived(Peer sender, Bytes message) {
+    /**
+     * Acts on receiving the full message
+     * 
+     * @param sender the sender - may be null if we are submitting this message to the network
+     * @param message the payload to send to the network
+     */
+    void fullMessageReceived(@Nullable Peer sender, Bytes message) {
       if (receivedFullMessage.compareAndSet(false, true)) {
         for (TimerTask task : tasks) {
           task.cancel();
         }
-        for (Peer peer : peerRepository.eagerPushPeers()) {
-          if (!sender.equals(peer)) {
-            messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, message);
+
+        if (sender == null || messageValidator.validate(message, sender)) {
+          for (Peer peer : peerRepository.eagerPushPeers()) {
+            if (sender == null || !sender.equals(peer)) {
+              messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, message);
+            }
           }
+          lazyQueue.addAll(
+              peerRepository
+                  .lazyPushPeers()
+                  .stream()
+                  .filter(p -> !lazyPeers.contains(p))
+                  .map(peer -> (Runnable) (() -> messageSender.sendMessage(MessageSender.Verb.IHAVE, peer, hash)))
+                  .collect(Collectors.toList()));
+          messageListener.accept(message);
         }
-        lazyQueue.addAll(
-            peerRepository
-                .lazyPushPeers()
-                .stream()
-                .filter(p -> !lazyPeers.contains(p))
-                .map(peer -> (Runnable) (() -> messageSender.sendMessage(MessageSender.Verb.IHAVE, peer, hash)))
-                .collect(Collectors.toList()));
       } else {
-        messageSender.sendMessage(MessageSender.Verb.PRUNE, sender, null);
-        peerRepository.moveToLazy(sender);
+        if (sender != null) {
+          messageSender.sendMessage(MessageSender.Verb.PRUNE, sender, null);
+          peerRepository.moveToLazy(sender);
+        }
       }
     }
 
@@ -99,19 +126,49 @@ public final class State {
     }
   }
 
-  public State(PeerRepository peerRepository, MessageHashing messageHashingFunction, MessageSender messageSender) {
-    this(peerRepository, messageHashingFunction, messageSender, 5000, 5000);
+  /**
+   * Constructor using default time constants.
+   * 
+   * @param peerRepository the peer repository to use to store and access peer information.
+   * @param messageHashingFunction the function to use to hash messages into hashes to compare them.
+   * @param messageSender a function abstracting sending messages to other peers.
+   * @param messageListener a function consuming messages when they are gossiped.
+   * @param messageValidator a function validating messages before they are gossiped to other peers.
+   */
+  public State(
+      PeerRepository peerRepository,
+      MessageHashing messageHashingFunction,
+      MessageSender messageSender,
+      Consumer<Bytes> messageListener,
+      MessageValidator messageValidator) {
+    this(peerRepository, messageHashingFunction, messageSender, messageListener, messageValidator, 5000, 5000);
   }
 
+  /**
+   * Constructor using default time constants.
+   * 
+   * @param peerRepository the peer repository to use to store and access peer information.
+   * @param messageHashingFunction the function to use to hash messages into hashes to compare them.
+   * @param messageSender a function abstracting sending messages to other peers.
+   * @param messageListener a function consuming messages when they are gossiped.
+   * @param messageValidator a function validating messages before they are gossiped to other peers.
+   * @param graftDelay delay in milliseconds to apply before this peer grafts an other peer when it finds that peer has
+   *        data it misses.
+   * @param lazyQueueInterval the interval in milliseconds between sending messages to lazy peers.
+   */
   public State(
       PeerRepository peerRepository,
       MessageHashing messageHashingFunction,
       MessageSender messageSender,
+      Consumer<Bytes> messageListener,
+      MessageValidator messageValidator,
       long graftDelay,
       long lazyQueueInterval) {
     this.peerRepository = peerRepository;
     this.messageHashingFunction = messageHashingFunction;
     this.messageSender = messageSender;
+    this.messageListener = messageListener;
+    this.messageValidator = messageValidator;
     this.delay = graftDelay;
     timer.schedule(new TimerTask() {
       @Override
@@ -132,7 +189,7 @@ public final class State {
 
   /**
    * Removes a peer from the collection of peers we are connected to.
-   * 
+   *
    * @param peer the peer to remove
    */
   public void removePeer(Peer peer) {
@@ -141,31 +198,31 @@ public final class State {
   }
 
   /**
-   * Records a message was received in full from a peer
+   * Records a message was received in full from a peer.
    *
    * @param peer the peer that sent the message
    * @param message the hash of the message
    */
-  public void receiveFullMessage(Peer peer, Bytes message) {
+  public void receiveGossipMessage(Peer peer, Bytes message) {
     peerRepository.considerNewPeer(peer);
     MessageHandler handler = messageHandlers.computeIfAbsent(messageHashingFunction.hash(message), MessageHandler::new);
     handler.fullMessageReceived(peer, message);
   }
 
   /**
-   * Records a message was partially received from a peer
+   * Records a message was partially received from a peer.
    *
    * @param peer the peer that sent the message
    * @param messageHash the hash of the message
    */
-  public void receiveHeaderMessage(Peer peer, Bytes messageHash) {
+  public void receiveIHaveMessage(Peer peer, Bytes messageHash) {
     MessageHandler handler = messageHandlers.computeIfAbsent(messageHash, MessageHandler::new);
     handler.partialMessageReceived(peer);
   }
 
   /**
    * Requests a peer be pruned away from the eager peers into the lazy peers
-   * 
+   *
    * @param peer the peer to move to lazy peers
    */
   public void receivePruneMessage(Peer peer) {
@@ -174,7 +231,7 @@ public final class State {
 
   /**
    * Requests a peer be grafted to the eager peers list
-   * 
+   *
    * @param peer the peer to add to the eager peers
    * @param messageHash the hash of the message that triggers this grafting
    */
@@ -183,6 +240,16 @@ public final class State {
     messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, messageHash);
   }
 
+  /**
+   * Sends a gossip message to all peers, according to their status.
+   * 
+   * @param message the message to propagate
+   */
+  public void sendGossipMessage(Bytes message) {
+    MessageHandler handler = messageHandlers.computeIfAbsent(messageHashingFunction.hash(message), MessageHandler::new);
+    handler.fullMessageReceived(null, message);
+  }
+
   void processQueue() {
     for (Runnable r : lazyQueue) {
       r.run();
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/StateActorFactory.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/SocketPeer.java
similarity index 63%
rename from plumtree/src/main/java/org/apache/tuweni/plumtree/StateActorFactory.java
rename to plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/SocketPeer.java
index 61d076b..d5322c2 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/StateActorFactory.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/SocketPeer.java
@@ -10,9 +10,29 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package org.apache.tuweni.plumtree;
+package org.apache.tuweni.plumtree.vertx;
 
-public interface StateActorFactory {
+import org.apache.tuweni.plumtree.Peer;
 
-  public StateActor create(State state);
+import io.vertx.core.net.NetSocket;
+
+/**
+ * Vert.x gossip peer associated with a socket
+ */
+final class SocketPeer implements Peer {
+
+  private final NetSocket socket;
+
+  SocketPeer(NetSocket socket) {
+    this.socket = socket;
+  }
+
+  NetSocket socket() {
+    return socket;
+  }
+
+  @Override
+  public String toString() {
+    return socket.localAddress().toString();
+  }
 }
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
new file mode 100644
index 0000000..303ac60
--- /dev/null
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.plumtree.vertx;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.concurrent.AsyncCompletion;
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
+import org.apache.tuweni.plumtree.MessageHashing;
+import org.apache.tuweni.plumtree.MessageSender;
+import org.apache.tuweni.plumtree.MessageValidator;
+import org.apache.tuweni.plumtree.Peer;
+import org.apache.tuweni.plumtree.PeerRepository;
+import org.apache.tuweni.plumtree.State;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.net.NetClient;
+import io.vertx.core.net.NetServer;
+
+/**
+ * Vert.x implementation of the plumtree gossip.
+ *
+ * This implementation is provided as an example and relies on a simplistic JSON serialization of messages.
+ *
+ */
+public final class VertxGossipServer {
+
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  private static final class Message {
+
+    public MessageSender.Verb verb;
+    public String payload;
+  }
+  private final class SocketHandler {
+
+    private final Peer peer;
+
+    SocketHandler(Peer peer) {
+      this.peer = peer;
+      state.addPeer(peer);
+    }
+
+    private Bytes buffer = Bytes.EMPTY;
+
+    void handle(Buffer data) {
+      buffer = Bytes.concatenate(buffer, Bytes.wrapBuffer(data));
+      Message message;
+      try {
+        JsonParser parser = mapper.getFactory().createParser(buffer.toArrayUnsafe());
+        message = parser.readValueAs(Message.class);
+        buffer = buffer.slice((int) parser.getCurrentLocation().getByteOffset());
+      } catch (IOException e) {
+        return;
+      }
+
+      switch (message.verb) {
+        case IHAVE:
+          state.receiveIHaveMessage(peer, Bytes.fromHexString(message.payload));
+          break;
+        case GOSSIP:
+          state.receiveGossipMessage(peer, Bytes.fromHexString(message.payload));
+          break;
+        case GRAFT:
+          state.receiveGraftMessage(peer, Bytes.fromHexString(message.payload));
+          break;
+        case PRUNE:
+          state.receivePruneMessage(peer);
+          break;
+      }
+    }
+
+    void close(Void aVoid) {
+      state.removePeer(peer);
+    }
+  }
+
+  private final AtomicBoolean started = new AtomicBoolean(false);
+  private final Vertx vertx;
+  private final int port;
+  private final MessageHashing messageHashing;
+  private final String networkInterface;
+  private final PeerRepository peerRepository;
+  private final Consumer<Bytes> payloadListener;
+  private final MessageValidator payloadValidator;
+  private State state;
+  private NetServer server;
+  private NetClient client;
+
+  public VertxGossipServer(
+      Vertx vertx,
+      String networkInterface,
+      int port,
+      MessageHashing messageHashing,
+      PeerRepository peerRepository,
+      Consumer<Bytes> payloadListener,
+      MessageValidator payloadValidator) {
+    this.vertx = vertx;
+    this.networkInterface = networkInterface;
+    this.port = port;
+    this.messageHashing = messageHashing;
+    this.peerRepository = peerRepository;
+    this.payloadListener = payloadListener;
+    this.payloadValidator = payloadValidator;
+  }
+
+  public AsyncCompletion start() {
+    if (started.compareAndSet(false, true)) {
+      CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
+      server = vertx.createNetServer();
+      client = vertx.createNetClient();
+      server.connectHandler(socket -> {
+        Peer peer = new SocketPeer(socket);
+        SocketHandler handler = new SocketHandler(peer);
+        socket.handler(handler::handle).closeHandler(handler::close);
+      });
+      server.listen(port, networkInterface, res -> {
+        if (res.failed()) {
+          completion.completeExceptionally(res.cause());
+        } else {
+          state = new State(peerRepository, messageHashing, (verb, peer, payload) -> {
+            Message message = new Message();
+            message.verb = verb;
+            message.payload = payload == null ? null : payload.toHexString();
+            try {
+              ((SocketPeer) peer).socket().write(Buffer.buffer(mapper.writeValueAsBytes(message)));
+            } catch (JsonProcessingException e) {
+              throw new RuntimeException(e);
+            }
+          }, payloadListener, payloadValidator);
+
+          completion.complete();
+        }
+      });
+
+      return completion;
+    } else {
+      return AsyncCompletion.completed();
+    }
+  }
+
+  public AsyncCompletion stop() {
+    if (started.compareAndSet(true, false)) {
+      CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
+
+      state.stop();
+      client.close();
+      server.close(res -> {
+        if (res.failed()) {
+          completion.completeExceptionally(res.cause());
+        } else {
+          completion.complete();
+        }
+      });
+
+      return completion;
+    }
+    return AsyncCompletion.completed();
+  }
+
+  public AsyncCompletion connectTo(String host, int port) {
+    if (!started.get()) {
+      throw new IllegalStateException("Server has not started");
+    }
+    CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
+    client.connect(port, host, res -> {
+      if (res.failed()) {
+        completion.completeExceptionally(res.cause());
+      } else {
+        completion.complete();
+        Peer peer = new SocketPeer(res.result());
+        SocketHandler handler = new SocketHandler(peer);
+        res.result().handler(handler::handle).closeHandler(handler::close);
+      }
+    });
+
+    return completion;
+  }
+
+  /**
+   * Gossip a message to all known peers.
+   *
+   * @param message the payload to propagate
+   */
+  public void gossip(Bytes message) {
+    if (!started.get()) {
+      throw new IllegalStateException("Server has not started");
+    }
+    state.sendGossipMessage(message);
+  }
+}
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
index ef4cade..3dfb206 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
@@ -21,12 +21,18 @@ import org.apache.tuweni.bytes.Bytes32;
 import org.apache.tuweni.crypto.Hash;
 import org.apache.tuweni.junit.BouncyCastleExtension;
 
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 @ExtendWith(BouncyCastleExtension.class)
 class StateTest {
 
+  private static class PeerImpl implements Peer {
+
+  }
+
   private static class MockMessageSender implements MessageSender {
 
     Verb verb;
@@ -42,10 +48,12 @@ class StateTest {
     }
   }
 
+  private static final AtomicReference<Bytes> messageRef = new AtomicReference<>();
+
   @Test
   void testInitialState() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
-    State state = new State(repo, Hash::keccak256, new MockMessageSender());
+    State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
     assertTrue(repo.peers().isEmpty());
     assertTrue(repo.lazyPushPeers().isEmpty());
     assertTrue(repo.eagerPushPeers().isEmpty());
@@ -54,10 +62,10 @@ class StateTest {
   @Test
   void firstRoundWithThreePeers() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
-    State state = new State(repo, Hash::keccak256, new MockMessageSender());
-    state.addPeer(new Peer());
-    state.addPeer(new Peer());
-    state.addPeer(new Peer());
+    State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
+    state.addPeer(new PeerImpl());
+    state.addPeer(new PeerImpl());
+    state.addPeer(new PeerImpl());
     assertTrue(repo.lazyPushPeers().isEmpty());
     assertEquals(3, repo.eagerPushPeers().size());
   }
@@ -65,8 +73,8 @@ class StateTest {
   @Test
   void removePeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
-    State state = new State(repo, Hash::keccak256, new MockMessageSender());
-    Peer peer = new Peer();
+    State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
+    Peer peer = new PeerImpl();
     state.addPeer(peer);
     state.removePeer(peer);
     assertTrue(repo.peers().isEmpty());
@@ -77,8 +85,8 @@ class StateTest {
   @Test
   void prunePeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
-    State state = new State(repo, Hash::keccak256, new MockMessageSender());
-    Peer peer = new Peer();
+    State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
+    Peer peer = new PeerImpl();
     state.addPeer(peer);
     state.receivePruneMessage(peer);
     assertEquals(0, repo.eagerPushPeers().size());
@@ -88,8 +96,8 @@ class StateTest {
   @Test
   void graftPeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
-    State state = new State(repo, Hash::keccak256, new MockMessageSender());
-    Peer peer = new Peer();
+    State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
+    Peer peer = new PeerImpl();
     state.addPeer(peer);
     state.receivePruneMessage(peer);
     assertEquals(0, repo.eagerPushPeers().size());
@@ -103,13 +111,13 @@ class StateTest {
   void receiveFullMessageFromEagerPeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender);
-    Peer peer = new Peer();
+    State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true);
+    Peer peer = new PeerImpl();
     state.addPeer(peer);
-    Peer otherPeer = new Peer();
+    Peer otherPeer = new PeerImpl();
     state.addPeer(otherPeer);
     Bytes32 msg = Bytes32.random();
-    state.receiveFullMessage(peer, msg);
+    state.receiveGossipMessage(peer, msg);
     assertEquals(msg, messageSender.payload);
     assertEquals(otherPeer, messageSender.peer);
   }
@@ -118,16 +126,16 @@ class StateTest {
   void receiveFullMessageFromEagerPeerWithALazyPeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender);
-    Peer peer = new Peer();
+    State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true);
+    Peer peer = new PeerImpl();
     state.addPeer(peer);
-    Peer otherPeer = new Peer();
+    Peer otherPeer = new PeerImpl();
     state.addPeer(otherPeer);
     Bytes32 msg = Bytes32.random();
-    Peer lazyPeer = new Peer();
+    Peer lazyPeer = new PeerImpl();
     state.addPeer(lazyPeer);
     repo.moveToLazy(lazyPeer);
-    state.receiveFullMessage(peer, msg);
+    state.receiveGossipMessage(peer, msg);
     assertEquals(msg, messageSender.payload);
     assertEquals(otherPeer, messageSender.peer);
     state.processQueue();
@@ -140,15 +148,15 @@ class StateTest {
   void receiveFullMessageFromEagerPeerThenPartialMessageFromLazyPeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender);
-    Peer peer = new Peer();
+    State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true);
+    Peer peer = new PeerImpl();
     state.addPeer(peer);
-    Peer lazyPeer = new Peer();
+    Peer lazyPeer = new PeerImpl();
     state.addPeer(lazyPeer);
     repo.moveToLazy(lazyPeer);
     Bytes message = Bytes32.random();
-    state.receiveFullMessage(peer, message);
-    state.receiveHeaderMessage(lazyPeer, message);
+    state.receiveGossipMessage(peer, message);
+    state.receiveIHaveMessage(lazyPeer, message);
     assertNull(messageSender.payload);
     assertNull(messageSender.peer);
   }
@@ -157,14 +165,14 @@ class StateTest {
   void receivePartialMessageFromLazyPeerAndNoFullMessage() throws Exception {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender, 100, 4000);
-    Peer peer = new Peer();
+    State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, 100, 4000);
+    Peer peer = new PeerImpl();
     state.addPeer(peer);
-    Peer lazyPeer = new Peer();
+    Peer lazyPeer = new PeerImpl();
     state.addPeer(lazyPeer);
     repo.moveToLazy(lazyPeer);
     Bytes message = Bytes32.random();
-    state.receiveHeaderMessage(lazyPeer, message);
+    state.receiveIHaveMessage(lazyPeer, message);
     Thread.sleep(200);
     assertEquals(message, messageSender.payload);
     assertEquals(lazyPeer, messageSender.peer);
@@ -175,16 +183,16 @@ class StateTest {
   void receivePartialMessageFromLazyPeerAndThenFullMessage() throws Exception {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender, 500, 4000);
-    Peer peer = new Peer();
+    State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, 500, 4000);
+    Peer peer = new PeerImpl();
     state.addPeer(peer);
-    Peer lazyPeer = new Peer();
+    Peer lazyPeer = new PeerImpl();
     state.addPeer(lazyPeer);
     repo.moveToLazy(lazyPeer);
     Bytes message = Bytes32.random();
-    state.receiveHeaderMessage(lazyPeer, Hash.keccak256(message));
+    state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message));
     Thread.sleep(100);
-    state.receiveFullMessage(peer, message);
+    state.receiveGossipMessage(peer, message);
     Thread.sleep(500);
     assertNull(messageSender.verb);
     assertNull(messageSender.payload);
@@ -195,10 +203,10 @@ class StateTest {
   void receiveFullMessageFromUnknownPeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender);
-    Peer peer = new Peer();
+    State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true);
+    Peer peer = new PeerImpl();
     Bytes message = Bytes32.random();
-    state.receiveFullMessage(peer, message);
+    state.receiveGossipMessage(peer, message);
     assertEquals(1, repo.eagerPushPeers().size());
     assertEquals(0, repo.lazyPushPeers().size());
     assertEquals(peer, repo.eagerPushPeers().iterator().next());
@@ -208,12 +216,12 @@ class StateTest {
   void prunePeerWhenReceivingTwiceTheSameFullMessage() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     MockMessageSender messageSender = new MockMessageSender();
-    State state = new State(repo, Hash::keccak256, messageSender);
-    Peer peer = new Peer();
-    Peer secondPeer = new Peer();
+    State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true);
+    Peer peer = new PeerImpl();
+    Peer secondPeer = new PeerImpl();
     Bytes message = Bytes32.random();
-    state.receiveFullMessage(peer, message);
-    state.receiveFullMessage(secondPeer, message);
+    state.receiveGossipMessage(peer, message);
+    state.receiveGossipMessage(secondPeer, message);
     assertEquals(1, repo.eagerPushPeers().size());
     assertEquals(1, repo.lazyPushPeers().size());
     assertNull(messageSender.payload);
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
new file mode 100644
index 0000000..f465afb
--- /dev/null
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.plumtree.vertx;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.junit.VertxExtension;
+import org.apache.tuweni.junit.VertxInstance;
+import org.apache.tuweni.plumtree.EphemeralPeerRepository;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.vertx.core.Vertx;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(VertxExtension.class)
+class VertxGossipServerTest {
+
+  @Test
+  void gossipDeadBeefToOtherNode(@VertxInstance Vertx vertx) throws Exception {
+
+    AtomicReference<Bytes> messageReceived1 = new AtomicReference<>();
+    AtomicReference<Bytes> messageReceived2 = new AtomicReference<>();
+
+    VertxGossipServer server1 = new VertxGossipServer(
+        vertx,
+        "127.0.0.1",
+        10000,
+        bytes -> bytes,
+        new EphemeralPeerRepository(),
+        messageReceived1::set,
+        (message, peer) -> true);
+    VertxGossipServer server2 = new VertxGossipServer(
+        vertx,
+        "127.0.0.1",
+        10001,
+        bytes -> bytes,
+        new EphemeralPeerRepository(),
+        messageReceived2::set,
+        (message, peer) -> true);
+
+    server1.start().join();
+    server2.start().join();
+
+    server1.connectTo("127.0.0.1", 10001).join();
+    server1.gossip(Bytes.fromHexString("deadbeef"));
+    Thread.sleep(1000);
+    assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
+
+    server1.stop().join();
+    server2.stop().join();
+  }
+
+  @Test
+  void gossipDeadBeefToTwoOtherNodes(@VertxInstance Vertx vertx) throws Exception {
+
+    AtomicReference<Bytes> messageReceived1 = new AtomicReference<>();
+    AtomicReference<Bytes> messageReceived2 = new AtomicReference<>();
+    AtomicReference<Bytes> messageReceived3 = new AtomicReference<>();
+
+    VertxGossipServer server1 = new VertxGossipServer(
+        vertx,
+        "127.0.0.1",
+        10000,
+        bytes -> bytes,
+        new EphemeralPeerRepository(),
+        messageReceived1::set,
+        (message, peer) -> true);
+    VertxGossipServer server2 = new VertxGossipServer(
+        vertx,
+        "127.0.0.1",
+        10001,
+        bytes -> bytes,
+        new EphemeralPeerRepository(),
+        messageReceived2::set,
+        (message, peer) -> true);
+    VertxGossipServer server3 = new VertxGossipServer(
+        vertx,
+        "127.0.0.1",
+        10002,
+        bytes -> bytes,
+        new EphemeralPeerRepository(),
+        messageReceived3::set,
+        (message, peer) -> true);
+
+    server1.start().join();
+    server2.start().join();
+    server3.start().join();
+
+    server1.connectTo("127.0.0.1", 10001).join();
+    server3.connectTo("127.0.0.1", 10001).join();
+    server1.gossip(Bytes.fromHexString("deadbeef"));
+    Thread.sleep(1000);
+    assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
+    assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.get());
+
+    server1.stop().join();
+    server2.stop().join();
+  }
+
+  @Test
+  void gossipCollision(@VertxInstance Vertx vertx) throws Exception {
+    AtomicReference<Bytes> messageReceived1 = new AtomicReference<>();
+    AtomicReference<Bytes> messageReceived2 = new AtomicReference<>();
+
+    EphemeralPeerRepository peerRepository1 = new EphemeralPeerRepository();
+    EphemeralPeerRepository peerRepository3 = new EphemeralPeerRepository();
+
+    VertxGossipServer server1 = new VertxGossipServer(
+        vertx,
+        "127.0.0.1",
+        10000,
+        bytes -> bytes,
+        peerRepository1,
+        messageReceived1::set,
+        (message, peer) -> true);
+    VertxGossipServer server2 = new VertxGossipServer(
+        vertx,
+        "127.0.0.1",
+        10001,
+        bytes -> bytes,
+        new EphemeralPeerRepository(),
+        messageReceived2::set,
+        (message, peer) -> true);
+    VertxGossipServer server3 = new VertxGossipServer(
+        vertx,
+        "127.0.0.1",
+        10002,
+        bytes -> bytes,
+        peerRepository3,
+        messageReceived2::set,
+        (message, peer) -> true);
+
+    server1.start().join();
+    server2.start().join();
+    server3.start().join();
+
+    server1.connectTo("127.0.0.1", 10001).join();
+    server2.connectTo("127.0.0.1", 10002).join();
+    server1.connectTo("127.0.0.1", 10002).join();
+    server1.gossip(Bytes.fromHexString("deadbeef"));
+    Thread.sleep(1000);
+    assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
+    Thread.sleep(1000);
+
+    assertTrue(peerRepository1.lazyPushPeers().size() == 1 || peerRepository3.lazyPushPeers().size() == 1);
+
+    server1.stop().join();
+    server2.stop().join();
+    server3.stop().join();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org