You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/04/27 08:11:42 UTC

[incubator-tuweni] 34/49: added message type to gossip header

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git

commit 7324371957196b9bedd404ed607dbc7c8f9b296f
Author: jonny rhea <jo...@gmail.com>
AuthorDate: Mon Apr 22 18:10:49 2019 -0500

    added message type to gossip header
---
 .../org/apache/tuweni/plumtree/MessageSender.java  | 13 ++++++--
 .../java/org/apache/tuweni/plumtree/State.java     | 25 +++++++++------
 .../tuweni/plumtree/vertx/VertxGossipServer.java   | 10 +++---
 .../java/org/apache/tuweni/plumtree/StateTest.java | 37 +++++++++++++++++-----
 .../plumtree/vertx/VertxGossipServerTest.java      |  7 ++--
 5 files changed, 65 insertions(+), 27 deletions(-)

diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
index ea18d67..5ff8bb6 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
@@ -22,19 +22,28 @@ import javax.annotation.Nullable;
 public interface MessageSender {
 
   /**
-   * Types of message supported by the dialect
+   * Types of verbs supported by the dialect
    */
   enum Verb {
     IHAVE, GRAFT, PRUNE, GOSSIP
   }
 
   /**
+   * Types of message supported by the dialect
+   */
+  enum Type {
+    BLOCK, ATTESTATION, NADA
+  }
+
+  /**
    * Sends bytes to a peer.
    * 
    * @param verb the type of message
+   * @param type the type of message
    * @param peer the target of the message
    * @param hash the hash of the message
    * @param payload the bytes to send
    */
-  void sendMessage(Verb verb, Peer peer, Bytes hash, @Nullable Bytes payload);
+  void sendMessage(Verb verb, Type type, Peer peer, Bytes hash, @Nullable Bytes payload);
+
 }
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
index b65fdbe..5b199e0 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
@@ -71,7 +71,7 @@ public final class State {
      * @param sender the sender - may be null if we are submitting this message to the network
      * @param message the payload to send to the network
      */
-    void fullMessageReceived(@Nullable Peer sender, Bytes message) {
+    void fullMessageReceived(@Nullable Peer sender, MessageSender.Type type, Bytes message) {
       if (receivedFullMessage.compareAndSet(false, true)) {
         for (TimerTask task : tasks) {
           task.cancel();
@@ -80,7 +80,7 @@ public final class State {
         if (sender == null || messageValidator.validate(message, sender)) {
           for (Peer peer : peerRepository.eagerPushPeers()) {
             if (sender == null || !sender.equals(peer)) {
-              messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, hash, message);
+              messageSender.sendMessage(MessageSender.Verb.GOSSIP, type, peer, hash, message);
             }
           }
           lazyQueue.addAll(
@@ -88,7 +88,9 @@ public final class State {
                   .lazyPushPeers()
                   .stream()
                   .filter(p -> !lazyPeers.contains(p))
-                  .map(peer -> (Runnable) (() -> messageSender.sendMessage(MessageSender.Verb.IHAVE, peer, hash, null)))
+                  .map(
+                      peer -> (Runnable) (() -> messageSender
+                          .sendMessage(MessageSender.Verb.IHAVE, MessageSender.Type.NADA, peer, hash, null)))
                   .collect(Collectors.toList()));
           if (sender != null) {
             messageListener.accept(message);
@@ -96,7 +98,7 @@ public final class State {
         }
       } else {
         if (sender != null) {
-          messageSender.sendMessage(MessageSender.Verb.PRUNE, sender, hash, null);
+          messageSender.sendMessage(MessageSender.Verb.PRUNE, MessageSender.Type.NADA, sender, hash, null);
           peerRepository.moveToLazy(sender);
         }
       }
@@ -110,7 +112,8 @@ public final class State {
           if (newPeerIndex == lazyPeers.size()) {
             newPeerIndex = 0;
           }
-          messageSender.sendMessage(MessageSender.Verb.GRAFT, lazyPeers.get(index), hash, null);
+          messageSender
+              .sendMessage(MessageSender.Verb.GRAFT, MessageSender.Type.NADA, lazyPeers.get(index), hash, null);
           scheduleGraftMessage(newPeerIndex++);
         }
       };
@@ -203,12 +206,13 @@ public final class State {
    * Records a message was received in full from a peer.
    *
    * @param peer the peer that sent the message
+   * @param type message type of the message
    * @param message the hash of the message
    */
-  public void receiveGossipMessage(Peer peer, Bytes message) {
+  public void receiveGossipMessage(Peer peer, MessageSender.Type type, Bytes message) {
     peerRepository.considerNewPeer(peer);
     MessageHandler handler = messageHandlers.computeIfAbsent(messageHashingFunction.hash(message), MessageHandler::new);
-    handler.fullMessageReceived(peer, message);
+    handler.fullMessageReceived(peer, type, message);
   }
 
   /**
@@ -239,19 +243,20 @@ public final class State {
    */
   public void receiveGraftMessage(Peer peer, Bytes messageHash) {
     peerRepository.moveToEager(peer);
-    messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, messageHash, null);
+    messageSender.sendMessage(MessageSender.Verb.GOSSIP, MessageSender.Type.NADA, peer, messageHash, null);
   }
 
   /**
    * Sends a gossip message to all peers, according to their status.
    * 
    * @param message the message to propagate
+   * @param type message type of the message
    * @return The associated hash of the message
    */
-  public Bytes sendGossipMessage(Bytes message) {
+  public Bytes sendGossipMessage(MessageSender.Type type, Bytes message) {
     Bytes messageHash = messageHashingFunction.hash(message);
     MessageHandler handler = messageHandlers.computeIfAbsent(messageHash, MessageHandler::new);
-    handler.fullMessageReceived(null, message);
+    handler.fullMessageReceived(null, type, message);
     return messageHash;
   }
 
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index fe2cf24..ae10f43 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -47,6 +47,7 @@ public final class VertxGossipServer {
   private static final class Message {
 
     public MessageSender.Verb verb;
+    public MessageSender.Type type;
     public String hash;
     public String payload;
   }
@@ -77,7 +78,7 @@ public final class VertxGossipServer {
           state.receiveIHaveMessage(peer, Bytes.fromHexString(message.payload));
           break;
         case GOSSIP:
-          state.receiveGossipMessage(peer, Bytes.fromHexString(message.payload));
+          state.receiveGossipMessage(peer, message.type, Bytes.fromHexString(message.payload));
           break;
         case GRAFT:
           state.receiveGraftMessage(peer, Bytes.fromHexString(message.payload));
@@ -136,9 +137,10 @@ public final class VertxGossipServer {
         if (res.failed()) {
           completion.completeExceptionally(res.cause());
         } else {
-          state = new State(peerRepository, messageHashing, (verb, peer, hash, payload) -> {
+          state = new State(peerRepository, messageHashing, (verb, type, peer, hash, payload) -> {
             Message message = new Message();
             message.verb = verb;
+            message.type = type;
             message.hash = hash.toHexString();
             message.payload = payload == null ? null : payload.toHexString();
             try {
@@ -201,10 +203,10 @@ public final class VertxGossipServer {
    *
    * @param message the payload to propagate
    */
-  public void gossip(Bytes message) {
+  public void gossip(MessageSender.Type type, Bytes message) {
     if (!started.get()) {
       throw new IllegalStateException("Server has not started");
     }
-    state.sendGossipMessage(message);
+    state.sendGossipMessage(type, message);
   }
 }
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
index be5f110..f245bbc 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
@@ -36,13 +36,15 @@ class StateTest {
   private static class MockMessageSender implements MessageSender {
 
     Verb verb;
+    Type type;
     Peer peer;
     Bytes hash;
     Bytes payload;
 
     @Override
-    public void sendMessage(Verb verb, Peer peer, Bytes hash, Bytes payload) {
+    public void sendMessage(Verb verb, Type type, Peer peer, Bytes hash, Bytes payload) {
       this.verb = verb;
+      this.type = type;
       this.peer = peer;
       this.hash = hash;
       this.payload = payload;
@@ -73,6 +75,25 @@ class StateTest {
   }
 
   @Test
+  void firstRoundWithTwoPeers() {
+    EphemeralPeerRepository repo = new EphemeralPeerRepository();
+    State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
+    state.addPeer(new PeerImpl());
+    state.addPeer(new PeerImpl());
+    assertTrue(repo.lazyPushPeers().isEmpty());
+    assertEquals(2, repo.eagerPushPeers().size());
+  }
+
+  @Test
+  void firstRoundWithOnePeer() {
+    EphemeralPeerRepository repo = new EphemeralPeerRepository();
+    State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
+    state.addPeer(new PeerImpl());
+    assertTrue(repo.lazyPushPeers().isEmpty());
+    assertEquals(1, repo.eagerPushPeers().size());
+  }
+
+  @Test
   void removePeer() {
     EphemeralPeerRepository repo = new EphemeralPeerRepository();
     State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
@@ -119,7 +140,7 @@ class StateTest {
     Peer otherPeer = new PeerImpl();
     state.addPeer(otherPeer);
     Bytes32 msg = Bytes32.random();
-    state.receiveGossipMessage(peer, msg);
+    state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, msg);
     assertEquals(msg, messageSender.payload);
     assertEquals(otherPeer, messageSender.peer);
   }
@@ -137,7 +158,7 @@ class StateTest {
     Peer lazyPeer = new PeerImpl();
     state.addPeer(lazyPeer);
     repo.moveToLazy(lazyPeer);
-    state.receiveGossipMessage(peer, msg);
+    state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, msg);
     assertEquals(msg, messageSender.payload);
     assertEquals(otherPeer, messageSender.peer);
     state.processQueue();
@@ -157,7 +178,7 @@ class StateTest {
     state.addPeer(lazyPeer);
     repo.moveToLazy(lazyPeer);
     Bytes message = Bytes32.random();
-    state.receiveGossipMessage(peer, message);
+    state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message);
     state.receiveIHaveMessage(lazyPeer, message);
     assertNull(messageSender.payload);
     assertNull(messageSender.peer);
@@ -194,7 +215,7 @@ class StateTest {
     Bytes message = Bytes32.random();
     state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message));
     Thread.sleep(100);
-    state.receiveGossipMessage(peer, message);
+    state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message);
     Thread.sleep(500);
     assertNull(messageSender.verb);
     assertNull(messageSender.payload);
@@ -208,7 +229,7 @@ class StateTest {
     State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true);
     Peer peer = new PeerImpl();
     Bytes message = Bytes32.random();
-    state.receiveGossipMessage(peer, message);
+    state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message);
     assertEquals(1, repo.eagerPushPeers().size());
     assertEquals(0, repo.lazyPushPeers().size());
     assertEquals(peer, repo.eagerPushPeers().iterator().next());
@@ -222,8 +243,8 @@ class StateTest {
     Peer peer = new PeerImpl();
     Peer secondPeer = new PeerImpl();
     Bytes message = Bytes32.random();
-    state.receiveGossipMessage(peer, message);
-    state.receiveGossipMessage(secondPeer, message);
+    state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message);
+    state.receiveGossipMessage(secondPeer, MessageSender.Type.BLOCK, message);
     assertEquals(1, repo.eagerPushPeers().size());
     assertEquals(1, repo.lazyPushPeers().size());
     assertNull(messageSender.payload);
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index d6e6106..e6ebe71 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -20,6 +20,7 @@ import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.junit.VertxExtension;
 import org.apache.tuweni.junit.VertxInstance;
 import org.apache.tuweni.plumtree.EphemeralPeerRepository;
+import org.apache.tuweni.plumtree.MessageSender;
 
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -57,7 +58,7 @@ class VertxGossipServerTest {
     server2.start().join();
 
     server1.connectTo("127.0.0.1", 10001).join();
-    server1.gossip(Bytes.fromHexString("deadbeef"));
+    server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef"));
     Thread.sleep(1000);
     assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
 
@@ -103,7 +104,7 @@ class VertxGossipServerTest {
 
     server1.connectTo("127.0.0.1", 10001).join();
     server3.connectTo("127.0.0.1", 10001).join();
-    server1.gossip(Bytes.fromHexString("deadbeef"));
+    server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef"));
     Thread.sleep(1000);
     assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
     assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.get());
@@ -153,7 +154,7 @@ class VertxGossipServerTest {
     server1.connectTo("127.0.0.1", 10001).join();
     server2.connectTo("127.0.0.1", 10002).join();
     server1.connectTo("127.0.0.1", 10002).join();
-    server1.gossip(Bytes.fromHexString("deadbeef"));
+    server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef"));
     Thread.sleep(1000);
     assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
     Thread.sleep(1000);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org