You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2023/01/29 08:55:21 UTC
[incubator-tuweni] branch main updated: better handling of attributes in plumtree - make it a map
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push:
new 46291031c better handling of attributes in plumtree - make it a map
new 6e7f92a29 Merge pull request #500 from atoulme/better_attributes
46291031c is described below
commit 46291031cea04bf01c1639eec0f52c64372e5d0e
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Sat Jan 28 22:23:41 2023 -0800
better handling of attributes in plumtree - make it a map
---
.../java/org/apache/tuweni/gossip/GossipApp.java | 2 +-
.../tuweni/plumtree/servlet/GossipServletTest.java | 62 +++++++--------
.../tuweni/plumtree/servlet/GossipServlet.java | 93 ++++++++++++++++------
.../plumtree/vertx/VertxGossipServerTest.java | 18 +++--
.../tuweni/plumtree/vertx/VertxGossipServer.java | 58 +++++++++++---
.../{MessageHashing.java => MessageIdentity.java} | 10 ++-
.../apache/tuweni/plumtree/MessageListener.java | 4 +-
.../org/apache/tuweni/plumtree/MessageSender.java | 3 +-
.../java/org/apache/tuweni/plumtree/State.java | 28 +++----
.../java/org/apache/tuweni/plumtree/StateTest.java | 23 ++++--
10 files changed, 203 insertions(+), 98 deletions(-)
diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
index 0d6509c5a..a14a80681 100644
--- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
@@ -235,6 +235,6 @@ public final class GossipApp {
}
public void publish(Bytes message) {
- server.gossip("", message);
+ server.gossip(Collections.emptyMap(), message);
}
}
diff --git a/plumtree-servlet/src/integrationTest/java/org/apache/tuweni/plumtree/servlet/GossipServletTest.java b/plumtree-servlet/src/integrationTest/java/org/apache/tuweni/plumtree/servlet/GossipServletTest.java
index 613fbbe2f..0dc8bcb22 100644
--- a/plumtree-servlet/src/integrationTest/java/org/apache/tuweni/plumtree/servlet/GossipServletTest.java
+++ b/plumtree-servlet/src/integrationTest/java/org/apache/tuweni/plumtree/servlet/GossipServletTest.java
@@ -21,6 +21,10 @@ import org.apache.tuweni.plumtree.EphemeralPeerRepository;
import org.apache.tuweni.plumtree.MessageListener;
import org.apache.tuweni.plumtree.Peer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
@@ -33,7 +37,7 @@ public class GossipServletTest {
public Bytes message;
@Override
- public void listen(Bytes messageBody, String attributes, Peer peer) {
+ public void listen(Bytes messageBody, Map<String, Bytes> attributes, Peer peer) {
message = messageBody;
}
}
@@ -59,8 +63,7 @@ public class GossipServletTest {
200,
200,
bytes -> bytes,
- "127.0.0.1",
- 10000,
+ "http://127.0.0.1:10000",
messageReceived1,
(message, peer) -> true,
null,
@@ -69,8 +72,7 @@ public class GossipServletTest {
200,
200,
bytes -> bytes,
- "127.0.0.1",
- 10001,
+ "http://127.0.0.1:10001",
messageReceived2,
(message, peer) -> true,
null,
@@ -83,8 +85,9 @@ public class GossipServletTest {
server1.start();
server2.start();
- gossipServlet.connectTo("127.0.0.1", 10001).join();
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ gossipServlet.connectTo("http://127.0.0.1:10001").join();
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
gossipServlet.gossip(attributes, Bytes.fromHexString("deadbeef"));
for (int i = 0; i < 10; i++) {
Thread.sleep(500);
@@ -107,8 +110,7 @@ public class GossipServletTest {
200,
200,
bytes -> bytes,
- "127.0.0.1",
- 10000,
+ "http://127.0.0.1:10000",
messageReceived1,
(message, peer) -> true,
null,
@@ -117,8 +119,7 @@ public class GossipServletTest {
200,
200,
bytes -> bytes,
- "127.0.0.1",
- 10001,
+ "http://127.0.0.1:10001",
messageReceived2,
(message, peer) -> true,
null,
@@ -127,8 +128,7 @@ public class GossipServletTest {
200,
200,
bytes -> bytes,
- "127.0.0.1",
- 10002,
+ "http://127.0.0.1:10002",
messageReceived3,
(message, peer) -> true,
null,
@@ -143,9 +143,10 @@ public class GossipServletTest {
server2.start();
server3.start();
- gossipServlet.connectTo("127.0.0.1", 10001).join();
- gossipServlet3.connectTo("127.0.0.1", 10001).join();
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ gossipServlet.connectTo("http://127.0.0.1:10001").join();
+ gossipServlet3.connectTo("http://127.0.0.1:10001").join();
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
gossipServlet.gossip(attributes, Bytes.fromHexString("deadbeef"));
for (int i = 0; i < 10; i++) {
Thread.sleep(500);
@@ -175,8 +176,7 @@ public class GossipServletTest {
200,
200,
bytes -> bytes,
- "127.0.0.1",
- 10000,
+ "http://127.0.0.1:10000",
messageReceived1,
(message, peer) -> true,
null,
@@ -185,8 +185,7 @@ public class GossipServletTest {
200,
200,
bytes -> bytes,
- "127.0.0.1",
- 10001,
+ "http://127.0.0.1:10001",
messageReceived2,
(message, peer) -> true,
null,
@@ -195,8 +194,7 @@ public class GossipServletTest {
200,
200,
bytes -> bytes,
- "127.0.0.1",
- 10002,
+ "http://127.0.0.1:10002",
messageReceived2,
(message, peer) -> true,
null,
@@ -212,12 +210,13 @@ public class GossipServletTest {
try {
- gossipServlet.connectTo("127.0.0.1", 10001).join();
- gossipServlet2.connectTo("127.0.0.1", 10002).join();
- gossipServlet.connectTo("127.0.0.1", 10002).join();
+ gossipServlet.connectTo("http://127.0.0.1:10001").join();
+ gossipServlet2.connectTo("http://127.0.0.1:10002").join();
+ gossipServlet.connectTo("http://127.0.0.1:10002").join();
assertEquals(2, peerRepository1.eagerPushPeers().size());
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
gossipServlet.gossip(attributes, Bytes.fromHexString("deadbeef"));
Thread.sleep(1000);
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message);
@@ -243,8 +242,7 @@ public class GossipServletTest {
200,
200,
bytes -> bytes,
- "127.0.0.1",
- 10000,
+ "http://127.0.0.1:10000",
messageReceived1,
(message, peer) -> true,
null,
@@ -253,8 +251,7 @@ public class GossipServletTest {
200,
200,
bytes -> bytes,
- "127.0.0.1",
- 10001,
+ "http://127.0.0.1:10001",
messageReceived2,
(message, peer) -> true,
null,
@@ -266,9 +263,10 @@ public class GossipServletTest {
server1.start();
server2.start();
- gossipServlet.connectTo("127.0.0.1", 10001).join();
+ gossipServlet.connectTo("http://127.0.0.1:10001").join();
assertEquals(1, peerRepository1.eagerPushPeers().size());
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
gossipServlet.send(peerRepository1.peers().iterator().next(), attributes, Bytes.fromHexString("deadbeef"));
Thread.sleep(1000);
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message);
diff --git a/plumtree-servlet/src/main/java/org/apache/tuweni/plumtree/servlet/GossipServlet.java b/plumtree-servlet/src/main/java/org/apache/tuweni/plumtree/servlet/GossipServlet.java
index 4258406c4..f7aa722fa 100644
--- a/plumtree-servlet/src/main/java/org/apache/tuweni/plumtree/servlet/GossipServlet.java
+++ b/plumtree-servlet/src/main/java/org/apache/tuweni/plumtree/servlet/GossipServlet.java
@@ -20,7 +20,7 @@ import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.concurrent.AsyncCompletion;
import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
-import org.apache.tuweni.plumtree.MessageHashing;
+import org.apache.tuweni.plumtree.MessageIdentity;
import org.apache.tuweni.plumtree.MessageListener;
import org.apache.tuweni.plumtree.MessageSender;
import org.apache.tuweni.plumtree.MessageValidator;
@@ -31,10 +31,18 @@ import org.apache.tuweni.plumtree.State;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import jakarta.servlet.ServletConfig;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
@@ -50,14 +58,51 @@ public class GossipServlet extends HttpServlet {
private static final Logger logger = LoggerFactory.getLogger(GossipServlet.class);
private static final ObjectMapper mapper = new ObjectMapper();
- private void sendMessage(MessageSender.Verb verb, String attributes, Peer peer, Bytes hash, Bytes payload) {
+ private final static class BytesSerializer extends StdSerializer<Bytes> {
+
+ public BytesSerializer() {
+ super(Bytes.class);
+ }
+
+ @Override
+ public void serialize(Bytes value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+ gen.writeString(value.toHexString());
+ }
+ }
+ static class BytesDeserializer extends StdDeserializer<Bytes> {
+
+ BytesDeserializer() {
+ super(Bytes.class);
+ }
+
+ @Override
+ public Bytes deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+ String value = p.getValueAsString();
+ return Bytes.fromHexStringLenient(value);
+ }
+
+ }
+
+ static {
+ SimpleModule module = new SimpleModule();
+ module.addSerializer(Bytes.class, new BytesSerializer());
+ module.addDeserializer(Bytes.class, new BytesDeserializer());
+ mapper.registerModule(module);
+ }
+
+ private void sendMessage(
+ MessageSender.Verb verb,
+ Map<String, Bytes> attributes,
+ Peer peer,
+ Bytes hash,
+ Bytes payload) {
Message message = new Message();
message.verb = verb;
message.attributes = attributes;
message.hash = hash.toHexString();
message.payload = payload == null ? null : payload.toHexString();
- HttpPost postMessage = new HttpPost("http://" + ((ServletPeer) peer).getAddress());
- postMessage.setHeader(PLUMTREE_SERVER_HEADER, this.networkInterface + ":" + this.port);
+ HttpPost postMessage = new HttpPost(((ServletPeer) peer).getAddress());
+ postMessage.setHeader(PLUMTREE_SERVER_HEADER, this.externalURL);
try {
ByteArrayEntity entity = new ByteArrayEntity(mapper.writeValueAsBytes(message), ContentType.APPLICATION_JSON);
postMessage.setEntity(entity);
@@ -76,17 +121,17 @@ public class GossipServlet extends HttpServlet {
}
private static final class Message {
+
public MessageSender.Verb verb;
- public String attributes;
+ public Map<String, Bytes> attributes;
public String hash;
public String payload;
}
private final int graftDelay;
private final int lazyQueueInterval;
- private final MessageHashing messageHashing;
- private final String networkInterface;
- private final int port;
+ private final MessageIdentity messageIdentity;
+ private final String externalURL;
private final MessageListener payloadListener;
private final MessageValidator payloadValidator;
private final PeerPruning peerPruningFunction;
@@ -98,18 +143,16 @@ public class GossipServlet extends HttpServlet {
public GossipServlet(
int graftDelay,
int lazyQueueInterval,
- MessageHashing messageHashing,
- String networkInterface,
- int port,
+ MessageIdentity messageIdentity,
+ String externalURL,
MessageListener payloadListener,
MessageValidator payloadValidator,
PeerPruning peerPruningFunction,
PeerRepository peerRepository) {
this.graftDelay = graftDelay;
this.lazyQueueInterval = lazyQueueInterval;
- this.messageHashing = messageHashing;
- this.networkInterface = networkInterface;
- this.port = port;
+ this.messageIdentity = messageIdentity;
+ this.externalURL = externalURL;
this.payloadListener = payloadListener;
this.payloadValidator = payloadValidator == null ? (bytes, peer) -> true : payloadValidator;
this.peerPruningFunction = peerPruningFunction == null ? (peer) -> true : peerPruningFunction;
@@ -125,7 +168,7 @@ public class GossipServlet extends HttpServlet {
httpclient = HttpClients.createDefault();
state = new State(
peerRepository,
- messageHashing,
+ messageIdentity,
this::sendMessage,
payloadListener,
payloadValidator,
@@ -200,7 +243,7 @@ public class GossipServlet extends HttpServlet {
* @param attributes the payload to propagate
* @param message the payload to propagate
*/
- public void gossip(String attributes, Bytes message) {
+ public void gossip(Map<String, Bytes> attributes, Bytes message) {
if (!started.get()) {
throw new IllegalStateException("Server has not started");
}
@@ -214,14 +257,14 @@ public class GossipServlet extends HttpServlet {
* @param attributes the payload to propagate
* @param message the payload to propagate
*/
- public void send(Peer peer, String attributes, Bytes message) {
+ public void send(Peer peer, Map<String, Bytes> attributes, Bytes message) {
if (!started.get()) {
throw new IllegalStateException("Server has not started");
}
state.sendMessage(peer, attributes, message);
}
- public AsyncCompletion connectTo(String host, int port) {
+ public AsyncCompletion connectTo(String url) {
if (!started.get()) {
throw new IllegalStateException("Server has not started");
}
@@ -229,22 +272,22 @@ public class GossipServlet extends HttpServlet {
CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
AtomicInteger counter = new AtomicInteger(0);
- roundConnect(host, port, counter, completion);
+ roundConnect(url, counter, completion);
return completion;
}
- private void roundConnect(String host, int port, AtomicInteger counter, CompletableAsyncCompletion completion) {
- ServletPeer peer = new ServletPeer(host + ":" + port);
- HttpPost postMessage = new HttpPost("http://" + peer.getAddress());
- postMessage.setHeader(PLUMTREE_SERVER_HEADER, this.networkInterface + ":" + this.port);
+ private void roundConnect(String url, AtomicInteger counter, CompletableAsyncCompletion completion) {
+ ServletPeer peer = new ServletPeer(url);
+ HttpPost postMessage = new HttpPost(peer.getAddress());
+ postMessage.setHeader(PLUMTREE_SERVER_HEADER, this.externalURL);
try {
httpclient.execute(postMessage, response -> {
if (response.getCode() > 299) {
if (counter.incrementAndGet() > 5) {
completion.completeExceptionally(new RuntimeException(response.getEntity().toString()));
} else {
- roundConnect(host, port, counter, completion);
+ roundConnect(url, counter, completion);
}
} else {
state.addPeer(peer);
@@ -256,7 +299,7 @@ public class GossipServlet extends HttpServlet {
if (counter.incrementAndGet() > 5) {
completion.completeExceptionally(e);
} else {
- roundConnect(host, port, counter, completion);
+ roundConnect(url, counter, completion);
}
}
}
diff --git a/plumtree-vertx/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree-vertx/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index 7f64843af..d16b98fd7 100644
--- a/plumtree-vertx/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++ b/plumtree-vertx/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -23,6 +23,10 @@ import org.apache.tuweni.plumtree.EphemeralPeerRepository;
import org.apache.tuweni.plumtree.MessageListener;
import org.apache.tuweni.plumtree.Peer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+
import io.vertx.core.Vertx;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -35,7 +39,7 @@ class VertxGossipServerTest {
public Bytes message;
@Override
- public void listen(Bytes messageBody, String attributes, Peer peer) {
+ public void listen(Bytes messageBody, Map<String, Bytes> attributes, Peer peer) {
message = messageBody;
}
}
@@ -73,7 +77,8 @@ class VertxGossipServerTest {
server2.start().join();
server1.connectTo("127.0.0.1", 10001).join();
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
for (int i = 0; i < 10; i++) {
Thread.sleep(500);
@@ -134,7 +139,8 @@ class VertxGossipServerTest {
server1.connectTo("127.0.0.1", 10001).join();
server3.connectTo("127.0.0.1", 10001).join();
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
for (int i = 0; i < 10; i++) {
Thread.sleep(500);
@@ -202,7 +208,8 @@ class VertxGossipServerTest {
server2.connectTo("127.0.0.1", 10002).join();
server1.connectTo("127.0.0.1", 10002).join();
assertEquals(2, peerRepository1.eagerPushPeers().size());
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
Thread.sleep(1000);
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message);
@@ -250,7 +257,8 @@ class VertxGossipServerTest {
server1.connectTo("127.0.0.1", 10001).join();
assertEquals(1, peerRepository1.eagerPushPeers().size());
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
server1.send(peerRepository1.peers().iterator().next(), attributes, Bytes.fromHexString("deadbeef"));
Thread.sleep(1000);
assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message);
diff --git a/plumtree-vertx/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree-vertx/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index fb08b6eb6..316a1657e 100644
--- a/plumtree-vertx/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++ b/plumtree-vertx/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -15,7 +15,7 @@ package org.apache.tuweni.plumtree.vertx;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.concurrent.AsyncCompletion;
import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
-import org.apache.tuweni.plumtree.MessageHashing;
+import org.apache.tuweni.plumtree.MessageIdentity;
import org.apache.tuweni.plumtree.MessageListener;
import org.apache.tuweni.plumtree.MessageSender;
import org.apache.tuweni.plumtree.MessageValidator;
@@ -25,13 +25,20 @@ import org.apache.tuweni.plumtree.PeerRepository;
import org.apache.tuweni.plumtree.State;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
+import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
@@ -47,10 +54,42 @@ public final class VertxGossipServer {
private static final ObjectMapper mapper = new ObjectMapper();
+ private final static class BytesSerializer extends StdSerializer<Bytes> {
+
+ public BytesSerializer() {
+ super(Bytes.class);
+ }
+
+ @Override
+ public void serialize(Bytes value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+ gen.writeString(value.toHexString());
+ }
+ }
+ static class BytesDeserializer extends StdDeserializer<Bytes> {
+
+ BytesDeserializer() {
+ super(Bytes.class);
+ }
+
+ @Override
+ public Bytes deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+ String value = p.getValueAsString();
+ return Bytes.fromHexStringLenient(value);
+ }
+
+ }
+
+ static {
+ SimpleModule module = new SimpleModule();
+ module.addSerializer(Bytes.class, new BytesSerializer());
+ module.addDeserializer(Bytes.class, new BytesDeserializer());
+ mapper.registerModule(module);
+ }
+
private static final class Message {
public MessageSender.Verb verb;
- public String attributes;
+ public Map<String, Bytes> attributes;
public String hash;
public String payload;
}
@@ -69,8 +108,7 @@ public final class VertxGossipServer {
buffer = Bytes.concatenate(buffer, Bytes.wrapBuffer(data));
while (!buffer.isEmpty()) {
Message message;
- try {
- JsonParser parser = mapper.getFactory().createParser(buffer.toArrayUnsafe());
+ try (JsonParser parser = mapper.getFactory().createParser(buffer.toArrayUnsafe())) {
message = parser.readValueAs(Message.class);
buffer = buffer.slice((int) parser.getCurrentLocation().getByteOffset());
} catch (IOException e) {
@@ -109,7 +147,7 @@ public final class VertxGossipServer {
private NetClient client;
private final int graftDelay;
private final int lazyQueueInterval;
- private final MessageHashing messageHashing;
+ private final MessageIdentity messageIdentity;
private final String networkInterface;
private final MessageListener payloadListener;
private final MessageValidator payloadValidator;
@@ -125,7 +163,7 @@ public final class VertxGossipServer {
Vertx vertx,
String networkInterface,
int port,
- MessageHashing messageHashing,
+ MessageIdentity messageIdentity,
PeerRepository peerRepository,
MessageListener payloadListener,
@Nullable MessageValidator payloadValidator,
@@ -135,7 +173,7 @@ public final class VertxGossipServer {
this.vertx = vertx;
this.networkInterface = networkInterface;
this.port = port;
- this.messageHashing = messageHashing;
+ this.messageIdentity = messageIdentity;
this.peerRepository = peerRepository;
this.payloadListener = payloadListener;
this.payloadValidator = payloadValidator == null ? (bytes, peer) -> true : payloadValidator;
@@ -159,7 +197,7 @@ public final class VertxGossipServer {
if (res.failed()) {
completion.completeExceptionally(res.cause());
} else {
- state = new State(peerRepository, messageHashing, (verb, attributes, peer, hash, payload) -> {
+ state = new State(peerRepository, messageIdentity, (verb, attributes, peer, hash, payload) -> {
vertx.executeBlocking(future -> {
Message message = new Message();
message.verb = verb;
@@ -242,7 +280,7 @@ public final class VertxGossipServer {
* @param attributes the payload to propagate
* @param message the payload to propagate
*/
- public void gossip(String attributes, Bytes message) {
+ public void gossip(Map<String, Bytes> attributes, Bytes message) {
if (!started.get()) {
throw new IllegalStateException("Server has not started");
}
@@ -256,7 +294,7 @@ public final class VertxGossipServer {
* @param attributes the payload to propagate
* @param message the payload to propagate
*/
- public void send(Peer peer, String attributes, Bytes message) {
+ public void send(Peer peer, Map<String, Bytes> attributes, Bytes message) {
if (!started.get()) {
throw new IllegalStateException("Server has not started");
}
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageHashing.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageIdentity.java
similarity index 81%
rename from plumtree/src/main/java/org/apache/tuweni/plumtree/MessageHashing.java
rename to plumtree/src/main/java/org/apache/tuweni/plumtree/MessageIdentity.java
index 3257c618f..9fd6cbe82 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageHashing.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageIdentity.java
@@ -18,8 +18,14 @@ import org.apache.tuweni.bytes.Bytes;
* Produces an identifiable footprint for a message (generally a hash) that can be passed on to other peers to identify
* uniquely a message being propagated.
*/
-public interface MessageHashing {
+public interface MessageIdentity {
- public Bytes hash(Bytes message);
+ /**
+ * Generates the identity of the message
+ *
+ * @param message the message from which to extract an identity
+ * @return the identity of the message
+ */
+ Bytes identity(Bytes message);
}
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
index 9cda77f18..b5640b605 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
@@ -14,6 +14,8 @@ package org.apache.tuweni.plumtree;
import org.apache.tuweni.bytes.Bytes;
+import java.util.Map;
+
/**
* Listens to an incoming message, along with its attributes.
*/
@@ -26,5 +28,5 @@ public interface MessageListener {
* @param attributes the attributes of the message
* @param peer the peer we received the message from
*/
- public void listen(Bytes messageBody, String attributes, Peer peer);
+ public void listen(Bytes messageBody, Map<String, Bytes> attributes, Peer peer);
}
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
index 8a8658000..6b8645bd4 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
@@ -14,6 +14,7 @@ package org.apache.tuweni.plumtree;
import org.apache.tuweni.bytes.Bytes;
+import java.util.Map;
import javax.annotation.Nullable;
/**
@@ -56,6 +57,6 @@ public interface MessageSender {
* @param hash the hash of the message
* @param payload the bytes to send
*/
- void sendMessage(Verb verb, String attributes, Peer peer, Bytes hash, @Nullable Bytes payload);
+ void sendMessage(Verb verb, Map<String, Bytes> attributes, Peer peer, Bytes hash, @Nullable Bytes payload);
}
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
index b6b67d1e6..6b175fbbf 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
@@ -37,7 +37,7 @@ public final class State {
private static final int maxMessagesHandlers = 1000000;
private final PeerRepository peerRepository;
- private final MessageHashing messageHashingFunction;
+ private final MessageIdentity messageIdentityFunction;
private final Map<Bytes, MessageHandler> messageHandlers = Collections.synchronizedMap(new LinkedHashMap<>() {
@Override
@@ -54,8 +54,8 @@ public final class State {
private final Timer timer = new Timer("plumtree", true);
private final long delay;
- public void sendMessage(Peer peer, String attributes, Bytes message) {
- Bytes messageHash = messageHashingFunction.hash(message);
+ public void sendMessage(Peer peer, Map<String, Bytes> attributes, Bytes message) {
+ Bytes messageHash = messageIdentityFunction.identity(message);
messageSender.sendMessage(MessageSender.Verb.SEND, attributes, peer, messageHash, message);
}
@@ -78,7 +78,7 @@ public final class State {
* @param sender the sender - may be null if we are submitting this message to the network
* @param message the payload to send to the network
*/
- void fullMessageReceived(@Nullable Peer sender, String attributes, Bytes message) {
+ void fullMessageReceived(@Nullable Peer sender, Map<String, Bytes> attributes, Bytes message) {
if (receivedFullMessage.compareAndSet(false, true)) {
for (TimerTask task : tasks) {
task.cancel();
@@ -145,7 +145,7 @@ public final class State {
* Constructor using default time constants.
*
* @param peerRepository the peer repository to use to store and access peer information.
- * @param messageHashingFunction the function to use to hash messages into hashes to compare them.
+ * @param messageIdentityFunction the function to use to hash messages into hashes to compare them.
* @param messageSender a function abstracting sending messages to other peers.
* @param messageListener a function consuming messages when they are gossiped.
* @param messageValidator a function validating messages before they are gossiped to other peers.
@@ -153,14 +153,14 @@ public final class State {
*/
public State(
PeerRepository peerRepository,
- MessageHashing messageHashingFunction,
+ MessageIdentity messageIdentityFunction,
MessageSender messageSender,
MessageListener messageListener,
MessageValidator messageValidator,
PeerPruning peerPruningFunction) {
this(
peerRepository,
- messageHashingFunction,
+ messageIdentityFunction,
messageSender,
messageListener,
messageValidator,
@@ -173,7 +173,7 @@ public final class State {
* Default constructor.
*
* @param peerRepository the peer repository to use to store and access peer information.
- * @param messageHashingFunction the function to use to hash messages into hashes to compare them.
+ * @param messageIdentityFunction the function to use to hash messages into hashes to compare them.
* @param messageSender a function abstracting sending messages to other peers.
* @param messageListener a function consuming messages when they are gossiped.
* @param messageValidator a function validating messages before they are gossiped to other peers.
@@ -184,7 +184,7 @@ public final class State {
*/
public State(
PeerRepository peerRepository,
- MessageHashing messageHashingFunction,
+ MessageIdentity messageIdentityFunction,
MessageSender messageSender,
MessageListener messageListener,
MessageValidator messageValidator,
@@ -192,7 +192,7 @@ public final class State {
long graftDelay,
long lazyQueueInterval) {
this.peerRepository = peerRepository;
- this.messageHashingFunction = messageHashingFunction;
+ this.messageIdentityFunction = messageIdentityFunction;
this.messageSender = messageSender;
this.messageListener = messageListener;
this.messageValidator = messageValidator;
@@ -233,8 +233,8 @@ public final class State {
* @param message the message
* @param messageHash the hash of the message
*/
- public void receiveGossipMessage(Peer peer, String attributes, Bytes message, Bytes messageHash) {
- Bytes checkHash = messageHashingFunction.hash(message);
+ public void receiveGossipMessage(Peer peer, Map<String, Bytes> attributes, Bytes message, Bytes messageHash) {
+ Bytes checkHash = messageIdentityFunction.identity(message);
if (!checkHash.equals(messageHash)) {
return;
}
@@ -281,8 +281,8 @@ public final class State {
* @param attributes of the message
* @return The associated hash of the message
*/
- public Bytes sendGossipMessage(String attributes, Bytes message) {
- Bytes messageHash = messageHashingFunction.hash(message);
+ public Bytes sendGossipMessage(Map<String, Bytes> attributes, Bytes message) {
+ Bytes messageHash = messageIdentityFunction.identity(message);
MessageHandler handler = messageHandlers.computeIfAbsent(messageHash, MessageHandler::new);
handler.fullMessageReceived(null, attributes, message);
return messageHash;
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
index 2e4af8e67..96750f522 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
@@ -21,6 +21,9 @@ import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.crypto.Hash;
import org.apache.tuweni.junit.BouncyCastleExtension;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
import java.util.UUID;
import org.jetbrains.annotations.NotNull;
@@ -47,7 +50,7 @@ class StateTest {
Bytes payload;
@Override
- public void sendMessage(Verb verb, String attributes, Peer peer, Bytes hash, Bytes payload) {
+ public void sendMessage(Verb verb, Map<String, Bytes> attributes, Peer peer, Bytes hash, Bytes payload) {
this.verb = verb;
this.peer = peer;
this.hash = hash;
@@ -181,7 +184,8 @@ class StateTest {
Peer otherPeer = new PeerImpl();
state.addPeer(otherPeer);
Bytes32 msg = Bytes32.random();
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg));
assertEquals(msg, messageSender.payload);
assertEquals(otherPeer, messageSender.peer);
@@ -201,7 +205,8 @@ class StateTest {
Peer lazyPeer = new PeerImpl();
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg));
assertEquals(msg, messageSender.payload);
assertEquals(otherPeer, messageSender.peer);
@@ -225,7 +230,8 @@ class StateTest {
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
Bytes message = Bytes32.random();
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message));
state.receiveIHaveMessage(lazyPeer, message);
assertNull(messageSender.payload);
@@ -279,7 +285,8 @@ class StateTest {
Bytes message = Bytes32.random();
state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message));
Thread.sleep(100);
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message));
Thread.sleep(500);
assertNull(messageSender.verb);
@@ -295,7 +302,8 @@ class StateTest {
new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true);
Peer peer = new PeerImpl();
Bytes message = Bytes32.random();
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message));
assertEquals(1, repo.eagerPushPeers().size());
assertEquals(0, repo.lazyPushPeers().size());
@@ -311,7 +319,8 @@ class StateTest {
Peer peer = new PeerImpl();
Peer secondPeer = new PeerImpl();
Bytes message = Bytes32.random();
- String attributes = "{\"message_type\": \"BLOCK\"}";
+ Map<String, Bytes> attributes =
+ Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message));
state.receiveGossipMessage(secondPeer, attributes, message, Hash.keccak256(message));
assertEquals(2, repo.eagerPushPeers().size());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org