You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2023/01/29 08:55:21 UTC

[incubator-tuweni] branch main updated: better handling of attributes in plumtree - make it a map

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/main by this push:
     new 46291031c better handling of attributes in plumtree - make it a map
     new 6e7f92a29 Merge pull request #500 from atoulme/better_attributes
46291031c is described below

commit 46291031cea04bf01c1639eec0f52c64372e5d0e
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Sat Jan 28 22:23:41 2023 -0800

    better handling of attributes in plumtree - make it a map
---
 .../java/org/apache/tuweni/gossip/GossipApp.java   |  2 +-
 .../tuweni/plumtree/servlet/GossipServletTest.java | 62 +++++++--------
 .../tuweni/plumtree/servlet/GossipServlet.java     | 93 ++++++++++++++++------
 .../plumtree/vertx/VertxGossipServerTest.java      | 18 +++--
 .../tuweni/plumtree/vertx/VertxGossipServer.java   | 58 +++++++++++---
 .../{MessageHashing.java => MessageIdentity.java}  | 10 ++-
 .../apache/tuweni/plumtree/MessageListener.java    |  4 +-
 .../org/apache/tuweni/plumtree/MessageSender.java  |  3 +-
 .../java/org/apache/tuweni/plumtree/State.java     | 28 +++----
 .../java/org/apache/tuweni/plumtree/StateTest.java | 23 ++++--
 10 files changed, 203 insertions(+), 98 deletions(-)

diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
index 0d6509c5a..a14a80681 100644
--- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
+++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java
@@ -235,6 +235,6 @@ public final class GossipApp {
   }
 
   public void publish(Bytes message) {
-    server.gossip("", message);
+    server.gossip(Collections.emptyMap(), message);
   }
 }
diff --git a/plumtree-servlet/src/integrationTest/java/org/apache/tuweni/plumtree/servlet/GossipServletTest.java b/plumtree-servlet/src/integrationTest/java/org/apache/tuweni/plumtree/servlet/GossipServletTest.java
index 613fbbe2f..0dc8bcb22 100644
--- a/plumtree-servlet/src/integrationTest/java/org/apache/tuweni/plumtree/servlet/GossipServletTest.java
+++ b/plumtree-servlet/src/integrationTest/java/org/apache/tuweni/plumtree/servlet/GossipServletTest.java
@@ -21,6 +21,10 @@ import org.apache.tuweni.plumtree.EphemeralPeerRepository;
 import org.apache.tuweni.plumtree.MessageListener;
 import org.apache.tuweni.plumtree.Peer;
 
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -33,7 +37,7 @@ public class GossipServletTest {
     public Bytes message;
 
     @Override
-    public void listen(Bytes messageBody, String attributes, Peer peer) {
+    public void listen(Bytes messageBody, Map<String, Bytes> attributes, Peer peer) {
       message = messageBody;
     }
   }
@@ -59,8 +63,7 @@ public class GossipServletTest {
         200,
         200,
         bytes -> bytes,
-        "127.0.0.1",
-        10000,
+        "http://127.0.0.1:10000",
         messageReceived1,
         (message, peer) -> true,
         null,
@@ -69,8 +72,7 @@ public class GossipServletTest {
         200,
         200,
         bytes -> bytes,
-        "127.0.0.1",
-        10001,
+        "http://127.0.0.1:10001",
         messageReceived2,
         (message, peer) -> true,
         null,
@@ -83,8 +85,9 @@ public class GossipServletTest {
     server1.start();
     server2.start();
 
-    gossipServlet.connectTo("127.0.0.1", 10001).join();
-    String attributes = "{\"message_type\": \"BLOCK\"}";
+    gossipServlet.connectTo("http://127.0.0.1:10001").join();
+    Map<String, Bytes> attributes =
+        Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
     gossipServlet.gossip(attributes, Bytes.fromHexString("deadbeef"));
     for (int i = 0; i < 10; i++) {
       Thread.sleep(500);
@@ -107,8 +110,7 @@ public class GossipServletTest {
         200,
         200,
         bytes -> bytes,
-        "127.0.0.1",
-        10000,
+        "http://127.0.0.1:10000",
         messageReceived1,
         (message, peer) -> true,
         null,
@@ -117,8 +119,7 @@ public class GossipServletTest {
         200,
         200,
         bytes -> bytes,
-        "127.0.0.1",
-        10001,
+        "http://127.0.0.1:10001",
         messageReceived2,
         (message, peer) -> true,
         null,
@@ -127,8 +128,7 @@ public class GossipServletTest {
         200,
         200,
         bytes -> bytes,
-        "127.0.0.1",
-        10002,
+        "http://127.0.0.1:10002",
         messageReceived3,
         (message, peer) -> true,
         null,
@@ -143,9 +143,10 @@ public class GossipServletTest {
     server2.start();
     server3.start();
 
-    gossipServlet.connectTo("127.0.0.1", 10001).join();
-    gossipServlet3.connectTo("127.0.0.1", 10001).join();
-    String attributes = "{\"message_type\": \"BLOCK\"}";
+    gossipServlet.connectTo("http://127.0.0.1:10001").join();
+    gossipServlet3.connectTo("http://127.0.0.1:10001").join();
+    Map<String, Bytes> attributes =
+        Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
     gossipServlet.gossip(attributes, Bytes.fromHexString("deadbeef"));
     for (int i = 0; i < 10; i++) {
       Thread.sleep(500);
@@ -175,8 +176,7 @@ public class GossipServletTest {
         200,
         200,
         bytes -> bytes,
-        "127.0.0.1",
-        10000,
+        "http://127.0.0.1:10000",
         messageReceived1,
         (message, peer) -> true,
         null,
@@ -185,8 +185,7 @@ public class GossipServletTest {
         200,
         200,
         bytes -> bytes,
-        "127.0.0.1",
-        10001,
+        "http://127.0.0.1:10001",
         messageReceived2,
         (message, peer) -> true,
         null,
@@ -195,8 +194,7 @@ public class GossipServletTest {
         200,
         200,
         bytes -> bytes,
-        "127.0.0.1",
-        10002,
+        "http://127.0.0.1:10002",
         messageReceived2,
         (message, peer) -> true,
         null,
@@ -212,12 +210,13 @@ public class GossipServletTest {
 
     try {
 
-      gossipServlet.connectTo("127.0.0.1", 10001).join();
-      gossipServlet2.connectTo("127.0.0.1", 10002).join();
-      gossipServlet.connectTo("127.0.0.1", 10002).join();
+      gossipServlet.connectTo("http://127.0.0.1:10001").join();
+      gossipServlet2.connectTo("http://127.0.0.1:10002").join();
+      gossipServlet.connectTo("http://127.0.0.1:10002").join();
 
       assertEquals(2, peerRepository1.eagerPushPeers().size());
-      String attributes = "{\"message_type\": \"BLOCK\"}";
+      Map<String, Bytes> attributes =
+          Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
       gossipServlet.gossip(attributes, Bytes.fromHexString("deadbeef"));
       Thread.sleep(1000);
       assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message);
@@ -243,8 +242,7 @@ public class GossipServletTest {
         200,
         200,
         bytes -> bytes,
-        "127.0.0.1",
-        10000,
+        "http://127.0.0.1:10000",
         messageReceived1,
         (message, peer) -> true,
         null,
@@ -253,8 +251,7 @@ public class GossipServletTest {
         200,
         200,
         bytes -> bytes,
-        "127.0.0.1",
-        10001,
+        "http://127.0.0.1:10001",
         messageReceived2,
         (message, peer) -> true,
         null,
@@ -266,9 +263,10 @@ public class GossipServletTest {
     server1.start();
     server2.start();
 
-    gossipServlet.connectTo("127.0.0.1", 10001).join();
+    gossipServlet.connectTo("http://127.0.0.1:10001").join();
     assertEquals(1, peerRepository1.eagerPushPeers().size());
-    String attributes = "{\"message_type\": \"BLOCK\"}";
+    Map<String, Bytes> attributes =
+        Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
     gossipServlet.send(peerRepository1.peers().iterator().next(), attributes, Bytes.fromHexString("deadbeef"));
     Thread.sleep(1000);
     assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message);
diff --git a/plumtree-servlet/src/main/java/org/apache/tuweni/plumtree/servlet/GossipServlet.java b/plumtree-servlet/src/main/java/org/apache/tuweni/plumtree/servlet/GossipServlet.java
index 4258406c4..f7aa722fa 100644
--- a/plumtree-servlet/src/main/java/org/apache/tuweni/plumtree/servlet/GossipServlet.java
+++ b/plumtree-servlet/src/main/java/org/apache/tuweni/plumtree/servlet/GossipServlet.java
@@ -20,7 +20,7 @@ import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
 import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.concurrent.AsyncCompletion;
 import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
-import org.apache.tuweni.plumtree.MessageHashing;
+import org.apache.tuweni.plumtree.MessageIdentity;
 import org.apache.tuweni.plumtree.MessageListener;
 import org.apache.tuweni.plumtree.MessageSender;
 import org.apache.tuweni.plumtree.MessageValidator;
@@ -31,10 +31,18 @@ import org.apache.tuweni.plumtree.State;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import jakarta.servlet.ServletConfig;
 import jakarta.servlet.ServletException;
 import jakarta.servlet.http.HttpServlet;
@@ -50,14 +58,51 @@ public class GossipServlet extends HttpServlet {
   private static final Logger logger = LoggerFactory.getLogger(GossipServlet.class);
   private static final ObjectMapper mapper = new ObjectMapper();
 
-  private void sendMessage(MessageSender.Verb verb, String attributes, Peer peer, Bytes hash, Bytes payload) {
+  private final static class BytesSerializer extends StdSerializer<Bytes> {
+
+    public BytesSerializer() {
+      super(Bytes.class);
+    }
+
+    @Override
+    public void serialize(Bytes value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+      gen.writeString(value.toHexString());
+    }
+  }
+  static class BytesDeserializer extends StdDeserializer<Bytes> {
+
+    BytesDeserializer() {
+      super(Bytes.class);
+    }
+
+    @Override
+    public Bytes deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+      String value = p.getValueAsString();
+      return Bytes.fromHexStringLenient(value);
+    }
+
+  }
+
+  static {
+    SimpleModule module = new SimpleModule();
+    module.addSerializer(Bytes.class, new BytesSerializer());
+    module.addDeserializer(Bytes.class, new BytesDeserializer());
+    mapper.registerModule(module);
+  }
+
+  private void sendMessage(
+      MessageSender.Verb verb,
+      Map<String, Bytes> attributes,
+      Peer peer,
+      Bytes hash,
+      Bytes payload) {
     Message message = new Message();
     message.verb = verb;
     message.attributes = attributes;
     message.hash = hash.toHexString();
     message.payload = payload == null ? null : payload.toHexString();
-    HttpPost postMessage = new HttpPost("http://" + ((ServletPeer) peer).getAddress());
-    postMessage.setHeader(PLUMTREE_SERVER_HEADER, this.networkInterface + ":" + this.port);
+    HttpPost postMessage = new HttpPost(((ServletPeer) peer).getAddress());
+    postMessage.setHeader(PLUMTREE_SERVER_HEADER, this.externalURL);
     try {
       ByteArrayEntity entity = new ByteArrayEntity(mapper.writeValueAsBytes(message), ContentType.APPLICATION_JSON);
       postMessage.setEntity(entity);
@@ -76,17 +121,17 @@ public class GossipServlet extends HttpServlet {
   }
 
   private static final class Message {
+
     public MessageSender.Verb verb;
-    public String attributes;
+    public Map<String, Bytes> attributes;
     public String hash;
     public String payload;
   }
 
   private final int graftDelay;
   private final int lazyQueueInterval;
-  private final MessageHashing messageHashing;
-  private final String networkInterface;
-  private final int port;
+  private final MessageIdentity messageIdentity;
+  private final String externalURL;
   private final MessageListener payloadListener;
   private final MessageValidator payloadValidator;
   private final PeerPruning peerPruningFunction;
@@ -98,18 +143,16 @@ public class GossipServlet extends HttpServlet {
   public GossipServlet(
       int graftDelay,
       int lazyQueueInterval,
-      MessageHashing messageHashing,
-      String networkInterface,
-      int port,
+      MessageIdentity messageIdentity,
+      String externalURL,
       MessageListener payloadListener,
       MessageValidator payloadValidator,
       PeerPruning peerPruningFunction,
       PeerRepository peerRepository) {
     this.graftDelay = graftDelay;
     this.lazyQueueInterval = lazyQueueInterval;
-    this.messageHashing = messageHashing;
-    this.networkInterface = networkInterface;
-    this.port = port;
+    this.messageIdentity = messageIdentity;
+    this.externalURL = externalURL;
     this.payloadListener = payloadListener;
     this.payloadValidator = payloadValidator == null ? (bytes, peer) -> true : payloadValidator;
     this.peerPruningFunction = peerPruningFunction == null ? (peer) -> true : peerPruningFunction;
@@ -125,7 +168,7 @@ public class GossipServlet extends HttpServlet {
       httpclient = HttpClients.createDefault();
       state = new State(
           peerRepository,
-          messageHashing,
+          messageIdentity,
           this::sendMessage,
           payloadListener,
           payloadValidator,
@@ -200,7 +243,7 @@ public class GossipServlet extends HttpServlet {
    * @param attributes the payload to propagate
    * @param message the payload to propagate
    */
-  public void gossip(String attributes, Bytes message) {
+  public void gossip(Map<String, Bytes> attributes, Bytes message) {
     if (!started.get()) {
       throw new IllegalStateException("Server has not started");
     }
@@ -214,14 +257,14 @@ public class GossipServlet extends HttpServlet {
    * @param attributes the payload to propagate
    * @param message the payload to propagate
    */
-  public void send(Peer peer, String attributes, Bytes message) {
+  public void send(Peer peer, Map<String, Bytes> attributes, Bytes message) {
     if (!started.get()) {
       throw new IllegalStateException("Server has not started");
     }
     state.sendMessage(peer, attributes, message);
   }
 
-  public AsyncCompletion connectTo(String host, int port) {
+  public AsyncCompletion connectTo(String url) {
     if (!started.get()) {
       throw new IllegalStateException("Server has not started");
     }
@@ -229,22 +272,22 @@ public class GossipServlet extends HttpServlet {
     CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
     AtomicInteger counter = new AtomicInteger(0);
 
-    roundConnect(host, port, counter, completion);
+    roundConnect(url, counter, completion);
 
     return completion;
   }
 
-  private void roundConnect(String host, int port, AtomicInteger counter, CompletableAsyncCompletion completion) {
-    ServletPeer peer = new ServletPeer(host + ":" + port);
-    HttpPost postMessage = new HttpPost("http://" + peer.getAddress());
-    postMessage.setHeader(PLUMTREE_SERVER_HEADER, this.networkInterface + ":" + this.port);
+  private void roundConnect(String url, AtomicInteger counter, CompletableAsyncCompletion completion) {
+    ServletPeer peer = new ServletPeer(url);
+    HttpPost postMessage = new HttpPost(peer.getAddress());
+    postMessage.setHeader(PLUMTREE_SERVER_HEADER, this.externalURL);
     try {
       httpclient.execute(postMessage, response -> {
         if (response.getCode() > 299) {
           if (counter.incrementAndGet() > 5) {
             completion.completeExceptionally(new RuntimeException(response.getEntity().toString()));
           } else {
-            roundConnect(host, port, counter, completion);
+            roundConnect(url, counter, completion);
           }
         } else {
           state.addPeer(peer);
@@ -256,7 +299,7 @@ public class GossipServlet extends HttpServlet {
       if (counter.incrementAndGet() > 5) {
         completion.completeExceptionally(e);
       } else {
-        roundConnect(host, port, counter, completion);
+        roundConnect(url, counter, completion);
       }
     }
   }
diff --git a/plumtree-vertx/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree-vertx/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
index 7f64843af..d16b98fd7 100644
--- a/plumtree-vertx/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
+++ b/plumtree-vertx/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -23,6 +23,10 @@ import org.apache.tuweni.plumtree.EphemeralPeerRepository;
 import org.apache.tuweni.plumtree.MessageListener;
 import org.apache.tuweni.plumtree.Peer;
 
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+
 import io.vertx.core.Vertx;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -35,7 +39,7 @@ class VertxGossipServerTest {
     public Bytes message;
 
     @Override
-    public void listen(Bytes messageBody, String attributes, Peer peer) {
+    public void listen(Bytes messageBody, Map<String, Bytes> attributes, Peer peer) {
       message = messageBody;
     }
   }
@@ -73,7 +77,8 @@ class VertxGossipServerTest {
     server2.start().join();
 
     server1.connectTo("127.0.0.1", 10001).join();
-    String attributes = "{\"message_type\": \"BLOCK\"}";
+    Map<String, Bytes> attributes =
+        Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
     server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
     for (int i = 0; i < 10; i++) {
       Thread.sleep(500);
@@ -134,7 +139,8 @@ class VertxGossipServerTest {
 
     server1.connectTo("127.0.0.1", 10001).join();
     server3.connectTo("127.0.0.1", 10001).join();
-    String attributes = "{\"message_type\": \"BLOCK\"}";
+    Map<String, Bytes> attributes =
+        Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
     server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
     for (int i = 0; i < 10; i++) {
       Thread.sleep(500);
@@ -202,7 +208,8 @@ class VertxGossipServerTest {
     server2.connectTo("127.0.0.1", 10002).join();
     server1.connectTo("127.0.0.1", 10002).join();
     assertEquals(2, peerRepository1.eagerPushPeers().size());
-    String attributes = "{\"message_type\": \"BLOCK\"}";
+    Map<String, Bytes> attributes =
+        Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
     server1.gossip(attributes, Bytes.fromHexString("deadbeef"));
     Thread.sleep(1000);
     assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message);
@@ -250,7 +257,8 @@ class VertxGossipServerTest {
 
     server1.connectTo("127.0.0.1", 10001).join();
     assertEquals(1, peerRepository1.eagerPushPeers().size());
-    String attributes = "{\"message_type\": \"BLOCK\"}";
+    Map<String, Bytes> attributes =
+        Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
     server1.send(peerRepository1.peers().iterator().next(), attributes, Bytes.fromHexString("deadbeef"));
     Thread.sleep(1000);
     assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message);
diff --git a/plumtree-vertx/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree-vertx/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
index fb08b6eb6..316a1657e 100644
--- a/plumtree-vertx/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
+++ b/plumtree-vertx/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -15,7 +15,7 @@ package org.apache.tuweni.plumtree.vertx;
 import org.apache.tuweni.bytes.Bytes;
 import org.apache.tuweni.concurrent.AsyncCompletion;
 import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
-import org.apache.tuweni.plumtree.MessageHashing;
+import org.apache.tuweni.plumtree.MessageIdentity;
 import org.apache.tuweni.plumtree.MessageListener;
 import org.apache.tuweni.plumtree.MessageSender;
 import org.apache.tuweni.plumtree.MessageValidator;
@@ -25,13 +25,20 @@ import org.apache.tuweni.plumtree.PeerRepository;
 import org.apache.tuweni.plumtree.State;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
 
+import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.net.NetClient;
@@ -47,10 +54,42 @@ public final class VertxGossipServer {
 
   private static final ObjectMapper mapper = new ObjectMapper();
 
+  private final static class BytesSerializer extends StdSerializer<Bytes> {
+
+    public BytesSerializer() {
+      super(Bytes.class);
+    }
+
+    @Override
+    public void serialize(Bytes value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+      gen.writeString(value.toHexString());
+    }
+  }
+  static class BytesDeserializer extends StdDeserializer<Bytes> {
+
+    BytesDeserializer() {
+      super(Bytes.class);
+    }
+
+    @Override
+    public Bytes deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+      String value = p.getValueAsString();
+      return Bytes.fromHexStringLenient(value);
+    }
+
+  }
+
+  static {
+    SimpleModule module = new SimpleModule();
+    module.addSerializer(Bytes.class, new BytesSerializer());
+    module.addDeserializer(Bytes.class, new BytesDeserializer());
+    mapper.registerModule(module);
+  }
+
   private static final class Message {
 
     public MessageSender.Verb verb;
-    public String attributes;
+    public Map<String, Bytes> attributes;
     public String hash;
     public String payload;
   }
@@ -69,8 +108,7 @@ public final class VertxGossipServer {
       buffer = Bytes.concatenate(buffer, Bytes.wrapBuffer(data));
       while (!buffer.isEmpty()) {
         Message message;
-        try {
-          JsonParser parser = mapper.getFactory().createParser(buffer.toArrayUnsafe());
+        try (JsonParser parser = mapper.getFactory().createParser(buffer.toArrayUnsafe())) {
           message = parser.readValueAs(Message.class);
           buffer = buffer.slice((int) parser.getCurrentLocation().getByteOffset());
         } catch (IOException e) {
@@ -109,7 +147,7 @@ public final class VertxGossipServer {
   private NetClient client;
   private final int graftDelay;
   private final int lazyQueueInterval;
-  private final MessageHashing messageHashing;
+  private final MessageIdentity messageIdentity;
   private final String networkInterface;
   private final MessageListener payloadListener;
   private final MessageValidator payloadValidator;
@@ -125,7 +163,7 @@ public final class VertxGossipServer {
       Vertx vertx,
       String networkInterface,
       int port,
-      MessageHashing messageHashing,
+      MessageIdentity messageIdentity,
       PeerRepository peerRepository,
       MessageListener payloadListener,
       @Nullable MessageValidator payloadValidator,
@@ -135,7 +173,7 @@ public final class VertxGossipServer {
     this.vertx = vertx;
     this.networkInterface = networkInterface;
     this.port = port;
-    this.messageHashing = messageHashing;
+    this.messageIdentity = messageIdentity;
     this.peerRepository = peerRepository;
     this.payloadListener = payloadListener;
     this.payloadValidator = payloadValidator == null ? (bytes, peer) -> true : payloadValidator;
@@ -159,7 +197,7 @@ public final class VertxGossipServer {
         if (res.failed()) {
           completion.completeExceptionally(res.cause());
         } else {
-          state = new State(peerRepository, messageHashing, (verb, attributes, peer, hash, payload) -> {
+          state = new State(peerRepository, messageIdentity, (verb, attributes, peer, hash, payload) -> {
             vertx.executeBlocking(future -> {
               Message message = new Message();
               message.verb = verb;
@@ -242,7 +280,7 @@ public final class VertxGossipServer {
    * @param attributes the payload to propagate
    * @param message the payload to propagate
    */
-  public void gossip(String attributes, Bytes message) {
+  public void gossip(Map<String, Bytes> attributes, Bytes message) {
     if (!started.get()) {
       throw new IllegalStateException("Server has not started");
     }
@@ -256,7 +294,7 @@ public final class VertxGossipServer {
    * @param attributes the payload to propagate
    * @param message the payload to propagate
    */
-  public void send(Peer peer, String attributes, Bytes message) {
+  public void send(Peer peer, Map<String, Bytes> attributes, Bytes message) {
     if (!started.get()) {
       throw new IllegalStateException("Server has not started");
     }
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageHashing.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageIdentity.java
similarity index 81%
rename from plumtree/src/main/java/org/apache/tuweni/plumtree/MessageHashing.java
rename to plumtree/src/main/java/org/apache/tuweni/plumtree/MessageIdentity.java
index 3257c618f..9fd6cbe82 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageHashing.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageIdentity.java
@@ -18,8 +18,14 @@ import org.apache.tuweni.bytes.Bytes;
  * Produces an identifiable footprint for a message (generally a hash) that can be passed on to other peers to identify
  * uniquely a message being propagated.
  */
-public interface MessageHashing {
+public interface MessageIdentity {
 
-  public Bytes hash(Bytes message);
+  /**
+   * Generates the identity of the message
+   * 
+   * @param message the message from which to extract an identity
+   * @return the identity of the message
+   */
+  Bytes identity(Bytes message);
 
 }
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
index 9cda77f18..b5640b605 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java
@@ -14,6 +14,8 @@ package org.apache.tuweni.plumtree;
 
 import org.apache.tuweni.bytes.Bytes;
 
+import java.util.Map;
+
 /**
  * Listens to an incoming message, along with its attributes.
  */
@@ -26,5 +28,5 @@ public interface MessageListener {
    * @param attributes the attributes of the message
    * @param peer the peer we received the message from
    */
-  public void listen(Bytes messageBody, String attributes, Peer peer);
+  public void listen(Bytes messageBody, Map<String, Bytes> attributes, Peer peer);
 }
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
index 8a8658000..6b8645bd4 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java
@@ -14,6 +14,7 @@ package org.apache.tuweni.plumtree;
 
 import org.apache.tuweni.bytes.Bytes;
 
+import java.util.Map;
 import javax.annotation.Nullable;
 
 /**
@@ -56,6 +57,6 @@ public interface MessageSender {
    * @param hash the hash of the message
    * @param payload the bytes to send
    */
-  void sendMessage(Verb verb, String attributes, Peer peer, Bytes hash, @Nullable Bytes payload);
+  void sendMessage(Verb verb, Map<String, Bytes> attributes, Peer peer, Bytes hash, @Nullable Bytes payload);
 
 }
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
index b6b67d1e6..6b175fbbf 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
@@ -37,7 +37,7 @@ public final class State {
   private static final int maxMessagesHandlers = 1000000;
 
   private final PeerRepository peerRepository;
-  private final MessageHashing messageHashingFunction;
+  private final MessageIdentity messageIdentityFunction;
   private final Map<Bytes, MessageHandler> messageHandlers = Collections.synchronizedMap(new LinkedHashMap<>() {
 
     @Override
@@ -54,8 +54,8 @@ public final class State {
   private final Timer timer = new Timer("plumtree", true);
   private final long delay;
 
-  public void sendMessage(Peer peer, String attributes, Bytes message) {
-    Bytes messageHash = messageHashingFunction.hash(message);
+  public void sendMessage(Peer peer, Map<String, Bytes> attributes, Bytes message) {
+    Bytes messageHash = messageIdentityFunction.identity(message);
     messageSender.sendMessage(MessageSender.Verb.SEND, attributes, peer, messageHash, message);
   }
 
@@ -78,7 +78,7 @@ public final class State {
      * @param sender the sender - may be null if we are submitting this message to the network
      * @param message the payload to send to the network
      */
-    void fullMessageReceived(@Nullable Peer sender, String attributes, Bytes message) {
+    void fullMessageReceived(@Nullable Peer sender, Map<String, Bytes> attributes, Bytes message) {
       if (receivedFullMessage.compareAndSet(false, true)) {
         for (TimerTask task : tasks) {
           task.cancel();
@@ -145,7 +145,7 @@ public final class State {
    * Constructor using default time constants.
    * 
    * @param peerRepository the peer repository to use to store and access peer information.
-   * @param messageHashingFunction the function to use to hash messages into hashes to compare them.
+   * @param messageIdentityFunction the function to use to hash messages into hashes to compare them.
    * @param messageSender a function abstracting sending messages to other peers.
    * @param messageListener a function consuming messages when they are gossiped.
    * @param messageValidator a function validating messages before they are gossiped to other peers.
@@ -153,14 +153,14 @@ public final class State {
    */
   public State(
       PeerRepository peerRepository,
-      MessageHashing messageHashingFunction,
+      MessageIdentity messageIdentityFunction,
       MessageSender messageSender,
       MessageListener messageListener,
       MessageValidator messageValidator,
       PeerPruning peerPruningFunction) {
     this(
         peerRepository,
-        messageHashingFunction,
+        messageIdentityFunction,
         messageSender,
         messageListener,
         messageValidator,
@@ -173,7 +173,7 @@ public final class State {
    * Default constructor.
    * 
    * @param peerRepository the peer repository to use to store and access peer information.
-   * @param messageHashingFunction the function to use to hash messages into hashes to compare them.
+   * @param messageIdentityFunction the function to use to hash messages into hashes to compare them.
    * @param messageSender a function abstracting sending messages to other peers.
    * @param messageListener a function consuming messages when they are gossiped.
    * @param messageValidator a function validating messages before they are gossiped to other peers.
@@ -184,7 +184,7 @@ public final class State {
    */
   public State(
       PeerRepository peerRepository,
-      MessageHashing messageHashingFunction,
+      MessageIdentity messageIdentityFunction,
       MessageSender messageSender,
       MessageListener messageListener,
       MessageValidator messageValidator,
@@ -192,7 +192,7 @@ public final class State {
       long graftDelay,
       long lazyQueueInterval) {
     this.peerRepository = peerRepository;
-    this.messageHashingFunction = messageHashingFunction;
+    this.messageIdentityFunction = messageIdentityFunction;
     this.messageSender = messageSender;
     this.messageListener = messageListener;
     this.messageValidator = messageValidator;
@@ -233,8 +233,8 @@ public final class State {
    * @param message the message
    * @param messageHash the hash of the message
    */
-  public void receiveGossipMessage(Peer peer, String attributes, Bytes message, Bytes messageHash) {
-    Bytes checkHash = messageHashingFunction.hash(message);
+  public void receiveGossipMessage(Peer peer, Map<String, Bytes> attributes, Bytes message, Bytes messageHash) {
+    Bytes checkHash = messageIdentityFunction.identity(message);
     if (!checkHash.equals(messageHash)) {
       return;
     }
@@ -281,8 +281,8 @@ public final class State {
    * @param attributes of the message
    * @return The associated hash of the message
    */
-  public Bytes sendGossipMessage(String attributes, Bytes message) {
-    Bytes messageHash = messageHashingFunction.hash(message);
+  public Bytes sendGossipMessage(Map<String, Bytes> attributes, Bytes message) {
+    Bytes messageHash = messageIdentityFunction.identity(message);
     MessageHandler handler = messageHandlers.computeIfAbsent(messageHash, MessageHandler::new);
     handler.fullMessageReceived(null, attributes, message);
     return messageHash;
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
index 2e4af8e67..96750f522 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
@@ -21,6 +21,9 @@ import org.apache.tuweni.bytes.Bytes32;
 import org.apache.tuweni.crypto.Hash;
 import org.apache.tuweni.junit.BouncyCastleExtension;
 
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 
 import org.jetbrains.annotations.NotNull;
@@ -47,7 +50,7 @@ class StateTest {
     Bytes payload;
 
     @Override
-    public void sendMessage(Verb verb, String attributes, Peer peer, Bytes hash, Bytes payload) {
+    public void sendMessage(Verb verb, Map<String, Bytes> attributes, Peer peer, Bytes hash, Bytes payload) {
       this.verb = verb;
       this.peer = peer;
       this.hash = hash;
@@ -181,7 +184,8 @@ class StateTest {
     Peer otherPeer = new PeerImpl();
     state.addPeer(otherPeer);
     Bytes32 msg = Bytes32.random();
-    String attributes = "{\"message_type\": \"BLOCK\"}";
+    Map<String, Bytes> attributes =
+        Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
     state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg));
     assertEquals(msg, messageSender.payload);
     assertEquals(otherPeer, messageSender.peer);
@@ -201,7 +205,8 @@ class StateTest {
     Peer lazyPeer = new PeerImpl();
     state.addPeer(lazyPeer);
     repo.moveToLazy(lazyPeer);
-    String attributes = "{\"message_type\": \"BLOCK\"}";
+    Map<String, Bytes> attributes =
+        Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
     state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg));
     assertEquals(msg, messageSender.payload);
     assertEquals(otherPeer, messageSender.peer);
@@ -225,7 +230,8 @@ class StateTest {
     state.addPeer(lazyPeer);
     repo.moveToLazy(lazyPeer);
     Bytes message = Bytes32.random();
-    String attributes = "{\"message_type\": \"BLOCK\"}";
+    Map<String, Bytes> attributes =
+        Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
     state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message));
     state.receiveIHaveMessage(lazyPeer, message);
     assertNull(messageSender.payload);
@@ -279,7 +285,8 @@ class StateTest {
     Bytes message = Bytes32.random();
     state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message));
     Thread.sleep(100);
-    String attributes = "{\"message_type\": \"BLOCK\"}";
+    Map<String, Bytes> attributes =
+        Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
     state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message));
     Thread.sleep(500);
     assertNull(messageSender.verb);
@@ -295,7 +302,8 @@ class StateTest {
         new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true);
     Peer peer = new PeerImpl();
     Bytes message = Bytes32.random();
-    String attributes = "{\"message_type\": \"BLOCK\"}";
+    Map<String, Bytes> attributes =
+        Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
     state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message));
     assertEquals(1, repo.eagerPushPeers().size());
     assertEquals(0, repo.lazyPushPeers().size());
@@ -311,7 +319,8 @@ class StateTest {
     Peer peer = new PeerImpl();
     Peer secondPeer = new PeerImpl();
     Bytes message = Bytes32.random();
-    String attributes = "{\"message_type\": \"BLOCK\"}";
+    Map<String, Bytes> attributes =
+        Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8)));
     state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message));
     state.receiveGossipMessage(secondPeer, attributes, message, Hash.keccak256(message));
     assertEquals(2, repo.eagerPushPeers().size());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org