You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/04/23 19:36:38 UTC
[incubator-tuweni] 34/40: added message type to gossip header
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit 7324371957196b9bedd404ed607dbc7c8f9b296f
Author: jonny rhea <jo...@gmail.com>
AuthorDate: Mon Apr 22 18:10:49 2019 -0500
added message type to gossip header
---
.../org/apache/tuweni/plumtree/MessageSender.java | 13 ++++++--
.../java/org/apache/tuweni/plumtree/State.java | 25 +++++++++------
.../tuweni/plumtree/vertx/VertxGossipServer.java | 10 +++---
.../java/org/apache/tuweni/plumtree/StateTest.java | 37 +++++++++++++++++-----
.../plumtree/vertx/VertxGossipServerTest.java | 7 ++--
5 files changed, 65 insertions(+), 27 deletions(-)
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
index ea18d67..5ff8bb6 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
@@ -22,19 +22,28 @@ import javax.annotation.Nullable;
public interface MessageSender {
/**
- * Types of message supported by the dialect
+ * Types of verbs supported by the dialect
*/
enum Verb {
IHAVE, GRAFT, PRUNE, GOSSIP
}
/**
+ * Types of message supported by the dialect
+ */
+ enum Type {
+ BLOCK, ATTESTATION, NADA
+ }
+
+ /**
* Sends bytes to a peer.
*
* @param verb the type of message
+ * @param type the type of message
* @param peer the target of the message
* @param hash the hash of the message
* @param payload the bytes to send
*/
- void sendMessage(Verb verb, Peer peer, Bytes hash, @Nullable Bytes payload);
+ void sendMessage(Verb verb, Type type, Peer peer, Bytes hash, @Nullable Bytes payload);
+
}
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
index b65fdbe..5b199e0 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
@@ -71,7 +71,7 @@ public final class State {
* @param sender the sender - may be null if we are submitting this message to the network
* @param message the payload to send to the network
*/
- void fullMessageReceived(@Nullable Peer sender, Bytes message) {
+ void fullMessageReceived(@Nullable Peer sender, MessageSender.Type type, Bytes message) {
if (receivedFullMessage.compareAndSet(false, true)) {
for (TimerTask task : tasks) {
task.cancel();
@@ -80,7 +80,7 @@ public final class State {
if (sender == null || messageValidator.validate(message, sender)) {
for (Peer peer : peerRepository.eagerPushPeers()) {
if (sender == null || !sender.equals(peer)) {
- messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, hash, message);
+ messageSender.sendMessage(MessageSender.Verb.GOSSIP, type, peer, hash, message);
}
}
lazyQueue.addAll(
@@ -88,7 +88,9 @@ public final class State {
.lazyPushPeers()
.stream()
.filter(p -> !lazyPeers.contains(p))
- .map(peer -> (Runnable) (() -> messageSender.sendMessage(MessageSender.Verb.IHAVE, peer, hash, null)))
+ .map(
+ peer -> (Runnable) (() -> messageSender
+ .sendMessage(MessageSender.Verb.IHAVE, MessageSender.Type.NADA, peer, hash, null)))
.collect(Collectors.toList()));
if (sender != null) {
messageListener.accept(message);
@@ -96,7 +98,7 @@ public final class State {
}
} else {
if (sender != null) {
- messageSender.sendMessage(MessageSender.Verb.PRUNE, sender, hash, null);
+ messageSender.sendMessage(MessageSender.Verb.PRUNE, MessageSender.Type.NADA, sender, hash, null);
peerRepository.moveToLazy(sender);
}
}
@@ -110,7 +112,8 @@ public final class State {
if (newPeerIndex == lazyPeers.size()) {
newPeerIndex = 0;
}
- messageSender.sendMessage(MessageSender.Verb.GRAFT, lazyPeers.get(index), hash, null);
+ messageSender
+ .sendMessage(MessageSender.Verb.GRAFT, MessageSender.Type.NADA, lazyPeers.get(index), hash, null);
scheduleGraftMessage(newPeerIndex++);
}
};
@@ -203,12 +206,13 @@ public final class State {
* Records a message was received in full from a peer.
*
* @param peer the peer that sent the message
+ * @param type message type of the message
* @param message the hash of the message
*/
- public void receiveGossipMessage(Peer peer, Bytes message) {
+ public void receiveGossipMessage(Peer peer, MessageSender.Type type, Bytes message) {
peerRepository.considerNewPeer(peer);
MessageHandler handler = messageHandlers.computeIfAbsent(messageHashingFunction.hash(message), MessageHandler::new);
- handler.fullMessageReceived(peer, message);
+ handler.fullMessageReceived(peer, type, message);
}
/**
@@ -239,19 +243,20 @@ public final class State {
*/
public void receiveGraftMessage(Peer peer, Bytes messageHash) {
peerRepository.moveToEager(peer);
- messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, messageHash, null);
+ messageSender.sendMessage(MessageSender.Verb.GOSSIP, MessageSender.Type.NADA, peer, messageHash, null);
}
/**
* Sends a gossip message to all peers, according to their status.
*
* @param message the message to propagate
+ * @param type message type of the message
* @return The associated hash of the message
*/
- public Bytes sendGossipMessage(Bytes message) {
+ public Bytes sendGossipMessage(MessageSender.Type type, Bytes message) {
Bytes messageHash = messageHashingFunction.hash(message);
MessageHandler handler = messageHandlers.computeIfAbsent(messageHash, MessageHandler::new);
- handler.fullMessageReceived(null, message);
+ handler.fullMessageReceived(null, type, message);
return messageHash;
}
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index fe2cf24..ae10f43 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -47,6 +47,7 @@ public final class VertxGossipServer {
private static final class Message {
public MessageSender.Verb verb;
+ public MessageSender.Type type;
public String hash;
public String payload;
}
@@ -77,7 +78,7 @@ public final class VertxGossipServer {
state.receiveIHaveMessage(peer, Bytes.fromHexString(message.payload));
break;
case GOSSIP:
- state.receiveGossipMessage(peer, Bytes.fromHexString(message.payload));
+ state.receiveGossipMessage(peer, message.type, Bytes.fromHexString(message.payload));
break;
case GRAFT:
state.receiveGraftMessage(peer, Bytes.fromHexString(message.payload));
@@ -136,9 +137,10 @@ public final class VertxGossipServer {
if (res.failed()) {
completion.completeExceptionally(res.cause());
} else {
- state = new State(peerRepository, messageHashing, (verb, peer, hash, payload) -> {
+ state = new State(peerRepository, messageHashing, (verb, type, peer, hash, payload) -> {
Message message = new Message();
message.verb = verb;
+ message.type = type;
message.hash = hash.toHexString();
message.payload = payload == null ? null : payload.toHexString();
try {
@@ -201,10 +203,10 @@ public final class VertxGossipServer {
*
* @param message the payload to propagate
*/
- public void gossip(Bytes message) {
+ public void gossip(MessageSender.Type type, Bytes message) {
if (!started.get()) {
throw new IllegalStateException("Server has not started");
}
- state.sendGossipMessage(message);
+ state.sendGossipMessage(type, message);
}
}
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
index be5f110..f245bbc 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
@@ -36,13 +36,15 @@ class StateTest {
private static class MockMessageSender implements MessageSender {
Verb verb;
+ Type type;
Peer peer;
Bytes hash;
Bytes payload;
@Override
- public void sendMessage(Verb verb, Peer peer, Bytes hash, Bytes payload) {
+ public void sendMessage(Verb verb, Type type, Peer peer, Bytes hash, Bytes payload) {
this.verb = verb;
+ this.type = type;
this.peer = peer;
this.hash = hash;
this.payload = payload;
@@ -73,6 +75,25 @@ class StateTest {
}
@Test
+ void firstRoundWithTwoPeers() {
+ EphemeralPeerRepository repo = new EphemeralPeerRepository();
+ State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
+ state.addPeer(new PeerImpl());
+ state.addPeer(new PeerImpl());
+ assertTrue(repo.lazyPushPeers().isEmpty());
+ assertEquals(2, repo.eagerPushPeers().size());
+ }
+
+ @Test
+ void firstRoundWithOnePeer() {
+ EphemeralPeerRepository repo = new EphemeralPeerRepository();
+ State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
+ state.addPeer(new PeerImpl());
+ assertTrue(repo.lazyPushPeers().isEmpty());
+ assertEquals(1, repo.eagerPushPeers().size());
+ }
+
+ @Test
void removePeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
@@ -119,7 +140,7 @@ class StateTest {
Peer otherPeer = new PeerImpl();
state.addPeer(otherPeer);
Bytes32 msg = Bytes32.random();
- state.receiveGossipMessage(peer, msg);
+ state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, msg);
assertEquals(msg, messageSender.payload);
assertEquals(otherPeer, messageSender.peer);
}
@@ -137,7 +158,7 @@ class StateTest {
Peer lazyPeer = new PeerImpl();
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
- state.receiveGossipMessage(peer, msg);
+ state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, msg);
assertEquals(msg, messageSender.payload);
assertEquals(otherPeer, messageSender.peer);
state.processQueue();
@@ -157,7 +178,7 @@ class StateTest {
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
Bytes message = Bytes32.random();
- state.receiveGossipMessage(peer, message);
+ state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message);
state.receiveIHaveMessage(lazyPeer, message);
assertNull(messageSender.payload);
assertNull(messageSender.peer);
@@ -194,7 +215,7 @@ class StateTest {
Bytes message = Bytes32.random();
state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message));
Thread.sleep(100);
- state.receiveGossipMessage(peer, message);
+ state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message);
Thread.sleep(500);
assertNull(messageSender.verb);
assertNull(messageSender.payload);
@@ -208,7 +229,7 @@ class StateTest {
State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true);
Peer peer = new PeerImpl();
Bytes message = Bytes32.random();
- state.receiveGossipMessage(peer, message);
+ state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message);
assertEquals(1, repo.eagerPushPeers().size());
assertEquals(0, repo.lazyPushPeers().size());
assertEquals(peer, repo.eagerPushPeers().iterator().next());
@@ -222,8 +243,8 @@ class StateTest {
Peer peer = new PeerImpl();
Peer secondPeer = new PeerImpl();
Bytes message = Bytes32.random();
- state.receiveGossipMessage(peer, message);
- state.receiveGossipMessage(secondPeer, message);
+ state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message);
+ state.receiveGossipMessage(secondPeer, MessageSender.Type.BLOCK, message);
assertEquals(1, repo.eagerPushPeers().size());
assertEquals(1, repo.lazyPushPeers().size());
assertNull(messageSender.payload);
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index d6e6106..e6ebe71 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -20,6 +20,7 @@ import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.junit.VertxExtension;
import org.apache.tuweni.junit.VertxInstance;
import org.apache.tuweni.plumtree.EphemeralPeerRepository;
+import org.apache.tuweni.plumtree.MessageSender;
import java.util.concurrent.atomic.AtomicReference;
@@ -57,7 +58,7 @@ class VertxGossipServerTest {
server2.start().join();
server1.connectTo("127.0.0.1", 10001).join();
- server1.gossip(Bytes.fromHexString("deadbeef"));
+ server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef"));
Thread.sleep(1000);
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
@@ -103,7 +104,7 @@ class VertxGossipServerTest {
server1.connectTo("127.0.0.1", 10001).join();
server3.connectTo("127.0.0.1", 10001).join();
- server1.gossip(Bytes.fromHexString("deadbeef"));
+ server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef"));
Thread.sleep(1000);
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.get());
@@ -153,7 +154,7 @@ class VertxGossipServerTest {
server1.connectTo("127.0.0.1", 10001).join();
server2.connectTo("127.0.0.1", 10002).join();
server1.connectTo("127.0.0.1", 10002).join();
- server1.gossip(Bytes.fromHexString("deadbeef"));
+ server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef"));
Thread.sleep(1000);
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
Thread.sleep(1000);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org