You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/04/26 00:24:16 UTC
[incubator-tuweni] 38/48: changed message_type to generic attribute
string
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit b98d45931122a02a6582d62565601acad585cf0b
Author: jonny rhea <jo...@gmail.com>
AuthorDate: Mon Apr 22 22:25:37 2019 -0500
changed message_type to generic attribute string
---
.../org/apache/tuweni/plumtree/MessageSender.java | 11 ++-------
.../java/org/apache/tuweni/plumtree/State.java | 25 ++++++++++-----------
.../tuweni/plumtree/vertx/VertxGossipServer.java | 14 ++++++------
.../java/org/apache/tuweni/plumtree/StateTest.java | 26 +++++++++++++---------
.../plumtree/vertx/VertxGossipServerTest.java | 10 +++++----
5 files changed, 43 insertions(+), 43 deletions(-)
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
index 5ff8bb6..dc59235 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
@@ -29,21 +29,14 @@ public interface MessageSender {
}
/**
- * Types of message supported by the dialect
- */
- enum Type {
- BLOCK, ATTESTATION, NADA
- }
-
- /**
* Sends bytes to a peer.
*
* @param verb the type of message
- * @param type the type of message
+ * @param attributes the attributes of message
* @param peer the target of the message
* @param hash the hash of the message
* @param payload the bytes to send
*/
- void sendMessage(Verb verb, Type type, Peer peer, Bytes hash, @Nullable Bytes payload);
+ void sendMessage(Verb verb, String attributes, Peer peer, Bytes hash, @Nullable Bytes payload);
}
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
index 5b199e0..7151a45 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
@@ -71,7 +71,7 @@ public final class State {
* @param sender the sender - may be null if we are submitting this message to the network
* @param message the payload to send to the network
*/
- void fullMessageReceived(@Nullable Peer sender, MessageSender.Type type, Bytes message) {
+ void fullMessageReceived(@Nullable Peer sender, String attributes, Bytes message) {
if (receivedFullMessage.compareAndSet(false, true)) {
for (TimerTask task : tasks) {
task.cancel();
@@ -80,7 +80,7 @@ public final class State {
if (sender == null || messageValidator.validate(message, sender)) {
for (Peer peer : peerRepository.eagerPushPeers()) {
if (sender == null || !sender.equals(peer)) {
- messageSender.sendMessage(MessageSender.Verb.GOSSIP, type, peer, hash, message);
+ messageSender.sendMessage(MessageSender.Verb.GOSSIP, attributes, peer, hash, message);
}
}
lazyQueue.addAll(
@@ -90,7 +90,7 @@ public final class State {
.filter(p -> !lazyPeers.contains(p))
.map(
peer -> (Runnable) (() -> messageSender
- .sendMessage(MessageSender.Verb.IHAVE, MessageSender.Type.NADA, peer, hash, null)))
+ .sendMessage(MessageSender.Verb.IHAVE, null, peer, hash, null)))
.collect(Collectors.toList()));
if (sender != null) {
messageListener.accept(message);
@@ -98,7 +98,7 @@ public final class State {
}
} else {
if (sender != null) {
- messageSender.sendMessage(MessageSender.Verb.PRUNE, MessageSender.Type.NADA, sender, hash, null);
+ messageSender.sendMessage(MessageSender.Verb.PRUNE, null, sender, hash, null);
peerRepository.moveToLazy(sender);
}
}
@@ -112,8 +112,7 @@ public final class State {
if (newPeerIndex == lazyPeers.size()) {
newPeerIndex = 0;
}
- messageSender
- .sendMessage(MessageSender.Verb.GRAFT, MessageSender.Type.NADA, lazyPeers.get(index), hash, null);
+ messageSender.sendMessage(MessageSender.Verb.GRAFT, null, lazyPeers.get(index), hash, null);
scheduleGraftMessage(newPeerIndex++);
}
};
@@ -206,13 +205,13 @@ public final class State {
* Records a message was received in full from a peer.
*
* @param peer the peer that sent the message
- * @param type message type of the message
+ * @param attributes of the message
* @param message the hash of the message
*/
- public void receiveGossipMessage(Peer peer, MessageSender.Type type, Bytes message) {
+ public void receiveGossipMessage(Peer peer, String attributes, Bytes message) {
peerRepository.considerNewPeer(peer);
MessageHandler handler = messageHandlers.computeIfAbsent(messageHashingFunction.hash(message), MessageHandler::new);
- handler.fullMessageReceived(peer, type, message);
+ handler.fullMessageReceived(peer, attributes, message);
}
/**
@@ -243,20 +242,20 @@ public final class State {
*/
public void receiveGraftMessage(Peer peer, Bytes messageHash) {
peerRepository.moveToEager(peer);
- messageSender.sendMessage(MessageSender.Verb.GOSSIP, MessageSender.Type.NADA, peer, messageHash, null);
+ messageSender.sendMessage(MessageSender.Verb.GOSSIP, null, peer, messageHash, null);
}
/**
* Sends a gossip message to all peers, according to their status.
*
* @param message the message to propagate
- * @param type message type of the message
+ * @param attributes of the message
* @return The associated hash of the message
*/
- public Bytes sendGossipMessage(MessageSender.Type type, Bytes message) {
+ public Bytes sendGossipMessage(String attributes, Bytes message) {
Bytes messageHash = messageHashingFunction.hash(message);
MessageHandler handler = messageHandlers.computeIfAbsent(messageHash, MessageHandler::new);
- handler.fullMessageReceived(null, type, message);
+ handler.fullMessageReceived(null, attributes, message);
return messageHash;
}
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index ae10f43..ddbb74c 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -47,7 +47,7 @@ public final class VertxGossipServer {
private static final class Message {
public MessageSender.Verb verb;
- public MessageSender.Type type;
+ public String attributes;
public String hash;
public String payload;
}
@@ -78,7 +78,7 @@ public final class VertxGossipServer {
state.receiveIHaveMessage(peer, Bytes.fromHexString(message.payload));
break;
case GOSSIP:
- state.receiveGossipMessage(peer, message.type, Bytes.fromHexString(message.payload));
+ state.receiveGossipMessage(peer, message.attributes, Bytes.fromHexString(message.payload));
break;
case GRAFT:
state.receiveGraftMessage(peer, Bytes.fromHexString(message.payload));
@@ -137,10 +137,10 @@ public final class VertxGossipServer {
if (res.failed()) {
completion.completeExceptionally(res.cause());
} else {
- state = new State(peerRepository, messageHashing, (verb, type, peer, hash, payload) -> {
+ state = new State(peerRepository, messageHashing, (verb, attributes, peer, hash, payload) -> {
Message message = new Message();
message.verb = verb;
- message.type = type;
+ message.attributes = attributes;
message.hash = hash.toHexString();
message.payload = payload == null ? null : payload.toHexString();
try {
@@ -200,13 +200,13 @@ public final class VertxGossipServer {
/**
* Gossip a message to all known peers.
- *
+ * @param attributes the payload to propagate
* @param message the payload to propagate
*/
- public void gossip(MessageSender.Type type, Bytes message) {
+ public void gossip(String attributes, Bytes message) {
if (!started.get()) {
throw new IllegalStateException("Server has not started");
}
- state.sendGossipMessage(type, message);
+ state.sendGossipMessage(attributes, message);
}
}
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
index f245bbc..5c03da3 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
@@ -36,15 +36,15 @@ class StateTest {
private static class MockMessageSender implements MessageSender {
Verb verb;
- Type type;
+ String attributes;
Peer peer;
Bytes hash;
Bytes payload;
@Override
- public void sendMessage(Verb verb, Type type, Peer peer, Bytes hash, Bytes payload) {
+ public void sendMessage(Verb verb, String attributes, Peer peer, Bytes hash, Bytes payload) {
this.verb = verb;
- this.type = type;
+ this.attributes = attributes;
this.peer = peer;
this.hash = hash;
this.payload = payload;
@@ -140,7 +140,8 @@ class StateTest {
Peer otherPeer = new PeerImpl();
state.addPeer(otherPeer);
Bytes32 msg = Bytes32.random();
- state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, msg);
+ String attributes = "{\"message_type\": \"BLOCK\"}";
+ state.receiveGossipMessage(peer, attributes, msg);
assertEquals(msg, messageSender.payload);
assertEquals(otherPeer, messageSender.peer);
}
@@ -158,7 +159,8 @@ class StateTest {
Peer lazyPeer = new PeerImpl();
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
- state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, msg);
+ String attributes = "{\"message_type\": \"BLOCK\"}";
+ state.receiveGossipMessage(peer, attributes, msg);
assertEquals(msg, messageSender.payload);
assertEquals(otherPeer, messageSender.peer);
state.processQueue();
@@ -178,7 +180,8 @@ class StateTest {
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
Bytes message = Bytes32.random();
- state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message);
+ String attributes = "{\"message_type\": \"BLOCK\"}";
+ state.receiveGossipMessage(peer, attributes, message);
state.receiveIHaveMessage(lazyPeer, message);
assertNull(messageSender.payload);
assertNull(messageSender.peer);
@@ -215,7 +218,8 @@ class StateTest {
Bytes message = Bytes32.random();
state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message));
Thread.sleep(100);
- state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message);
+ String attributes = "{\"message_type\": \"BLOCK\"}";
+ state.receiveGossipMessage(peer, attributes, message);
Thread.sleep(500);
assertNull(messageSender.verb);
assertNull(messageSender.payload);
@@ -229,7 +233,8 @@ class StateTest {
State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true);
Peer peer = new PeerImpl();
Bytes message = Bytes32.random();
- state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message);
+ String attributes = "{\"message_type\": \"BLOCK\"}";
+ state.receiveGossipMessage(peer, attributes, message);
assertEquals(1, repo.eagerPushPeers().size());
assertEquals(0, repo.lazyPushPeers().size());
assertEquals(peer, repo.eagerPushPeers().iterator().next());
@@ -243,8 +248,9 @@ class StateTest {
Peer peer = new PeerImpl();
Peer secondPeer = new PeerImpl();
Bytes message = Bytes32.random();
- state.receiveGossipMessage(peer, MessageSender.Type.BLOCK, message);
- state.receiveGossipMessage(secondPeer, MessageSender.Type.BLOCK, message);
+ String attributes = "{\"message_type\": \"BLOCK\"}";
+ state.receiveGossipMessage(peer, attributes, message);
+ state.receiveGossipMessage(secondPeer, attributes, message);
assertEquals(1, repo.eagerPushPeers().size());
assertEquals(1, repo.lazyPushPeers().size());
assertNull(messageSender.payload);
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index e6ebe71..384adb3 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -20,7 +20,6 @@ import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.junit.VertxExtension;
import org.apache.tuweni.junit.VertxInstance;
import org.apache.tuweni.plumtree.EphemeralPeerRepository;
-import org.apache.tuweni.plumtree.MessageSender;
import java.util.concurrent.atomic.AtomicReference;
@@ -58,7 +57,8 @@ class VertxGossipServerTest {
server2.start().join();
server1.connectTo("127.0.0.1", 10001).join();
- server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef"));
+ String attributes = "{\"message_type\": \"BLOCK\"}";
+ server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
Thread.sleep(1000);
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
@@ -104,7 +104,8 @@ class VertxGossipServerTest {
server1.connectTo("127.0.0.1", 10001).join();
server3.connectTo("127.0.0.1", 10001).join();
- server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef"));
+ String attributes = "{\"message_type\": \"BLOCK\"}";
+ server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
Thread.sleep(1000);
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.get());
@@ -154,7 +155,8 @@ class VertxGossipServerTest {
server1.connectTo("127.0.0.1", 10001).join();
server2.connectTo("127.0.0.1", 10002).join();
server1.connectTo("127.0.0.1", 10002).join();
- server1.gossip(MessageSender.Type.BLOCK, Bytes.fromHexString("deadbeef"));
+ String attributes = "{\"message_type\": \"BLOCK\"}";
+ server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
Thread.sleep(1000);
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
Thread.sleep(1000);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org