You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2019/04/26 00:23:51 UTC
[incubator-tuweni] 13/48: Add a sample integration example to
plumtree
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit 1da45edfe6114b0632690e00e1f4728c45572944
Author: Antoine Toulme <to...@apache.org>
AuthorDate: Wed Apr 3 16:58:53 2019 -0700
Add a sample integration example to plumtree
---
plumtree/build.gradle | 4 +
.../{StateActor.java => MessageValidator.java} | 18 +-
.../main/java/org/apache/tuweni/plumtree/Peer.java | 5 +-
.../java/org/apache/tuweni/plumtree/State.java | 115 +++++++++---
.../SocketPeer.java} | 26 ++-
.../tuweni/plumtree/vertx/VertxGossipServer.java | 208 +++++++++++++++++++++
.../java/org/apache/tuweni/plumtree/StateTest.java | 92 ++++-----
.../plumtree/vertx/VertxGossipServerTest.java | 165 ++++++++++++++++
8 files changed, 562 insertions(+), 71 deletions(-)
diff --git a/plumtree/build.gradle b/plumtree/build.gradle
index 9d6569f..041b8bb 100644
--- a/plumtree/build.gradle
+++ b/plumtree/build.gradle
@@ -2,9 +2,13 @@ description = 'Plumtree - Push-Lazy-pUsh Multicast TREE, an implementation of Ep
dependencies {
compile project(':bytes')
+ compile project(':concurrent')
compile project(':crypto')
+ compileOnly 'io.vertx:vertx-core'
+
testCompile project(':junit')
+ testCompile 'io.vertx:vertx-core'
testCompile 'org.bouncycastle:bcprov-jdk15on'
testCompile 'org.junit.jupiter:junit-jupiter-api'
testCompile 'org.junit.jupiter:junit-jupiter-params'
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/StateActor.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageValidator.java
similarity index 63%
rename from plumtree/src/main/java/org/apache/tuweni/plumtree/StateActor.java
rename to plumtree/src/main/java/org/apache/tuweni/plumtree/MessageValidator.java
index dc5029d..d17132d 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/StateActor.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageValidator.java
@@ -12,5 +12,21 @@
*/
package org.apache.tuweni.plumtree;
-public interface StateActor {
+import org.apache.tuweni.bytes.Bytes;
+
+/**
+ * Validator for a message and a peer.
+ *
+ * This validator is called prior to gossiping the message from that peer to other peers.
+ */
+public interface MessageValidator {
+
+ /**
+ * Validates that the message from the peer is valid.
+ *
+ * @param message the payload sent over the network
+ * @param peer the peer that sent the message
+ * @return true if the message is valid
+ */
+ boolean validate(Bytes message, Peer peer);
}
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/Peer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/Peer.java
index bf42d86..10e87b3 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/Peer.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/Peer.java
@@ -12,5 +12,8 @@
*/
package org.apache.tuweni.plumtree;
-public class Peer {
+/**
+ * A peer part of the gossip system.
+ */
+public interface Peer {
}
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
index 36ecd37..f35f358 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java
@@ -15,15 +15,18 @@ package org.apache.tuweni.plumtree;
import org.apache.tuweni.bytes.Bytes;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
/**
* Local state to our peer, representing the make-up of the tree of peers.
@@ -32,8 +35,19 @@ public final class State {
private final PeerRepository peerRepository;
private final MessageHashing messageHashingFunction;
- private final Map<Bytes, MessageHandler> messageHandlers = new ConcurrentHashMap<>();
+ private final int maxMessagesHandlers = 1000000;
+ private final Map<Bytes, MessageHandler> messageHandlers =
+ Collections.synchronizedMap(new LinkedHashMap<Bytes, MessageHandler>() {
+
+ @Override
+ protected boolean removeEldestEntry(final Map.Entry<Bytes, MessageHandler> eldest) {
+ return super.size() > maxMessagesHandlers;
+ }
+ });
+
private final MessageSender messageSender;
+ private final Consumer<Bytes> messageListener;
+ private final MessageValidator messageValidator;
private final Queue<Runnable> lazyQueue = new ConcurrentLinkedQueue<>();
private final Timer timer = new Timer("plumtree", true);
private final long delay;
@@ -41,6 +55,7 @@ public final class State {
final class MessageHandler {
private final Bytes hash;
+
private final AtomicBoolean receivedFullMessage = new AtomicBoolean(false);
private final AtomicBoolean requestingGraftMessage = new AtomicBoolean(false);
private List<TimerTask> tasks = new ArrayList<>();
@@ -50,26 +65,38 @@ public final class State {
this.hash = hash;
}
- void fullMessageReceived(Peer sender, Bytes message) {
+ /**
+ * Acts on receiving the full message
+ *
+ * @param sender the sender - may be null if we are submitting this message to the network
+ * @param message the payload to send to the network
+ */
+ void fullMessageReceived(@Nullable Peer sender, Bytes message) {
if (receivedFullMessage.compareAndSet(false, true)) {
for (TimerTask task : tasks) {
task.cancel();
}
- for (Peer peer : peerRepository.eagerPushPeers()) {
- if (!sender.equals(peer)) {
- messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, message);
+
+ if (sender == null || messageValidator.validate(message, sender)) {
+ for (Peer peer : peerRepository.eagerPushPeers()) {
+ if (sender == null || !sender.equals(peer)) {
+ messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, message);
+ }
}
+ lazyQueue.addAll(
+ peerRepository
+ .lazyPushPeers()
+ .stream()
+ .filter(p -> !lazyPeers.contains(p))
+ .map(peer -> (Runnable) (() -> messageSender.sendMessage(MessageSender.Verb.IHAVE, peer, hash)))
+ .collect(Collectors.toList()));
+ messageListener.accept(message);
}
- lazyQueue.addAll(
- peerRepository
- .lazyPushPeers()
- .stream()
- .filter(p -> !lazyPeers.contains(p))
- .map(peer -> (Runnable) (() -> messageSender.sendMessage(MessageSender.Verb.IHAVE, peer, hash)))
- .collect(Collectors.toList()));
} else {
- messageSender.sendMessage(MessageSender.Verb.PRUNE, sender, null);
- peerRepository.moveToLazy(sender);
+ if (sender != null) {
+ messageSender.sendMessage(MessageSender.Verb.PRUNE, sender, null);
+ peerRepository.moveToLazy(sender);
+ }
}
}
@@ -99,19 +126,49 @@ public final class State {
}
}
- public State(PeerRepository peerRepository, MessageHashing messageHashingFunction, MessageSender messageSender) {
- this(peerRepository, messageHashingFunction, messageSender, 5000, 5000);
+ /**
+ * Constructor using default time constants.
+ *
+ * @param peerRepository the peer repository to use to store and access peer information.
+ * @param messageHashingFunction the function to use to hash messages into hashes to compare them.
+ * @param messageSender a function abstracting sending messages to other peers.
+ * @param messageListener a function consuming messages when they are gossiped.
+ * @param messageValidator a function validating messages before they are gossiped to other peers.
+ */
+ public State(
+ PeerRepository peerRepository,
+ MessageHashing messageHashingFunction,
+ MessageSender messageSender,
+ Consumer<Bytes> messageListener,
+ MessageValidator messageValidator) {
+ this(peerRepository, messageHashingFunction, messageSender, messageListener, messageValidator, 5000, 5000);
}
+ /**
+ * Constructor using default time constants.
+ *
+ * @param peerRepository the peer repository to use to store and access peer information.
+ * @param messageHashingFunction the function to use to hash messages into hashes to compare them.
+ * @param messageSender a function abstracting sending messages to other peers.
+ * @param messageListener a function consuming messages when they are gossiped.
+ * @param messageValidator a function validating messages before they are gossiped to other peers.
+ * @param graftDelay delay in milliseconds to apply before this peer grafts an other peer when it finds that peer has
+ * data it misses.
+ * @param lazyQueueInterval the interval in milliseconds between sending messages to lazy peers.
+ */
public State(
PeerRepository peerRepository,
MessageHashing messageHashingFunction,
MessageSender messageSender,
+ Consumer<Bytes> messageListener,
+ MessageValidator messageValidator,
long graftDelay,
long lazyQueueInterval) {
this.peerRepository = peerRepository;
this.messageHashingFunction = messageHashingFunction;
this.messageSender = messageSender;
+ this.messageListener = messageListener;
+ this.messageValidator = messageValidator;
this.delay = graftDelay;
timer.schedule(new TimerTask() {
@Override
@@ -132,7 +189,7 @@ public final class State {
/**
* Removes a peer from the collection of peers we are connected to.
- *
+ *
* @param peer the peer to remove
*/
public void removePeer(Peer peer) {
@@ -141,31 +198,31 @@ public final class State {
}
/**
- * Records a message was received in full from a peer
+ * Records a message was received in full from a peer.
*
* @param peer the peer that sent the message
* @param message the hash of the message
*/
- public void receiveFullMessage(Peer peer, Bytes message) {
+ public void receiveGossipMessage(Peer peer, Bytes message) {
peerRepository.considerNewPeer(peer);
MessageHandler handler = messageHandlers.computeIfAbsent(messageHashingFunction.hash(message), MessageHandler::new);
handler.fullMessageReceived(peer, message);
}
/**
- * Records a message was partially received from a peer
+ * Records a message was partially received from a peer.
*
* @param peer the peer that sent the message
* @param messageHash the hash of the message
*/
- public void receiveHeaderMessage(Peer peer, Bytes messageHash) {
+ public void receiveIHaveMessage(Peer peer, Bytes messageHash) {
MessageHandler handler = messageHandlers.computeIfAbsent(messageHash, MessageHandler::new);
handler.partialMessageReceived(peer);
}
/**
* Requests a peer be pruned away from the eager peers into the lazy peers
- *
+ *
* @param peer the peer to move to lazy peers
*/
public void receivePruneMessage(Peer peer) {
@@ -174,7 +231,7 @@ public final class State {
/**
* Requests a peer be grafted to the eager peers list
- *
+ *
* @param peer the peer to add to the eager peers
* @param messageHash the hash of the message that triggers this grafting
*/
@@ -183,6 +240,16 @@ public final class State {
messageSender.sendMessage(MessageSender.Verb.GOSSIP, peer, messageHash);
}
+ /**
+ * Sends a gossip message to all peers, according to their status.
+ *
+ * @param message the message to propagate
+ */
+ public void sendGossipMessage(Bytes message) {
+ MessageHandler handler = messageHandlers.computeIfAbsent(messageHashingFunction.hash(message), MessageHandler::new);
+ handler.fullMessageReceived(null, message);
+ }
+
void processQueue() {
for (Runnable r : lazyQueue) {
r.run();
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/StateActorFactory.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/SocketPeer.java
similarity index 63%
rename from plumtree/src/main/java/org/apache/tuweni/plumtree/StateActorFactory.java
rename to plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/SocketPeer.java
index 61d076b..d5322c2 100644
--- a/plumtree/src/main/java/org/apache/tuweni/plumtree/StateActorFactory.java
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/SocketPeer.java
@@ -10,9 +10,29 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-package org.apache.tuweni.plumtree;
+package org.apache.tuweni.plumtree.vertx;
-public interface StateActorFactory {
+import org.apache.tuweni.plumtree.Peer;
- public StateActor create(State state);
+import io.vertx.core.net.NetSocket;
+
+/**
+ * Vert.x gossip peer associated with a socket
+ */
+final class SocketPeer implements Peer {
+
+ private final NetSocket socket;
+
+ SocketPeer(NetSocket socket) {
+ this.socket = socket;
+ }
+
+ NetSocket socket() {
+ return socket;
+ }
+
+ @Override
+ public String toString() {
+ return socket.localAddress().toString();
+ }
}
diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
new file mode 100644
index 0000000..303ac60
--- /dev/null
+++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.plumtree.vertx;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.concurrent.AsyncCompletion;
+import org.apache.tuweni.concurrent.CompletableAsyncCompletion;
+import org.apache.tuweni.plumtree.MessageHashing;
+import org.apache.tuweni.plumtree.MessageSender;
+import org.apache.tuweni.plumtree.MessageValidator;
+import org.apache.tuweni.plumtree.Peer;
+import org.apache.tuweni.plumtree.PeerRepository;
+import org.apache.tuweni.plumtree.State;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.net.NetClient;
+import io.vertx.core.net.NetServer;
+
+/**
+ * Vert.x implementation of the plumtree gossip.
+ *
+ * This implementation is provided as an example and relies on a simplistic JSON serialization of messages.
+ *
+ */
+public final class VertxGossipServer {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ private static final class Message {
+
+ public MessageSender.Verb verb;
+ public String payload;
+ }
+ private final class SocketHandler {
+
+ private final Peer peer;
+
+ SocketHandler(Peer peer) {
+ this.peer = peer;
+ state.addPeer(peer);
+ }
+
+ private Bytes buffer = Bytes.EMPTY;
+
+ void handle(Buffer data) {
+ buffer = Bytes.concatenate(buffer, Bytes.wrapBuffer(data));
+ Message message;
+ try {
+ JsonParser parser = mapper.getFactory().createParser(buffer.toArrayUnsafe());
+ message = parser.readValueAs(Message.class);
+ buffer = buffer.slice((int) parser.getCurrentLocation().getByteOffset());
+ } catch (IOException e) {
+ return;
+ }
+
+ switch (message.verb) {
+ case IHAVE:
+ state.receiveIHaveMessage(peer, Bytes.fromHexString(message.payload));
+ break;
+ case GOSSIP:
+ state.receiveGossipMessage(peer, Bytes.fromHexString(message.payload));
+ break;
+ case GRAFT:
+ state.receiveGraftMessage(peer, Bytes.fromHexString(message.payload));
+ break;
+ case PRUNE:
+ state.receivePruneMessage(peer);
+ break;
+ }
+ }
+
+ void close(Void aVoid) {
+ state.removePeer(peer);
+ }
+ }
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final Vertx vertx;
+ private final int port;
+ private final MessageHashing messageHashing;
+ private final String networkInterface;
+ private final PeerRepository peerRepository;
+ private final Consumer<Bytes> payloadListener;
+ private final MessageValidator payloadValidator;
+ private State state;
+ private NetServer server;
+ private NetClient client;
+
+ public VertxGossipServer(
+ Vertx vertx,
+ String networkInterface,
+ int port,
+ MessageHashing messageHashing,
+ PeerRepository peerRepository,
+ Consumer<Bytes> payloadListener,
+ MessageValidator payloadValidator) {
+ this.vertx = vertx;
+ this.networkInterface = networkInterface;
+ this.port = port;
+ this.messageHashing = messageHashing;
+ this.peerRepository = peerRepository;
+ this.payloadListener = payloadListener;
+ this.payloadValidator = payloadValidator;
+ }
+
+ public AsyncCompletion start() {
+ if (started.compareAndSet(false, true)) {
+ CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
+ server = vertx.createNetServer();
+ client = vertx.createNetClient();
+ server.connectHandler(socket -> {
+ Peer peer = new SocketPeer(socket);
+ SocketHandler handler = new SocketHandler(peer);
+ socket.handler(handler::handle).closeHandler(handler::close);
+ });
+ server.listen(port, networkInterface, res -> {
+ if (res.failed()) {
+ completion.completeExceptionally(res.cause());
+ } else {
+ state = new State(peerRepository, messageHashing, (verb, peer, payload) -> {
+ Message message = new Message();
+ message.verb = verb;
+ message.payload = payload == null ? null : payload.toHexString();
+ try {
+ ((SocketPeer) peer).socket().write(Buffer.buffer(mapper.writeValueAsBytes(message)));
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }, payloadListener, payloadValidator);
+
+ completion.complete();
+ }
+ });
+
+ return completion;
+ } else {
+ return AsyncCompletion.completed();
+ }
+ }
+
+ public AsyncCompletion stop() {
+ if (started.compareAndSet(true, false)) {
+ CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
+
+ state.stop();
+ client.close();
+ server.close(res -> {
+ if (res.failed()) {
+ completion.completeExceptionally(res.cause());
+ } else {
+ completion.complete();
+ }
+ });
+
+ return completion;
+ }
+ return AsyncCompletion.completed();
+ }
+
+ public AsyncCompletion connectTo(String host, int port) {
+ if (!started.get()) {
+ throw new IllegalStateException("Server has not started");
+ }
+ CompletableAsyncCompletion completion = AsyncCompletion.incomplete();
+ client.connect(port, host, res -> {
+ if (res.failed()) {
+ completion.completeExceptionally(res.cause());
+ } else {
+ completion.complete();
+ Peer peer = new SocketPeer(res.result());
+ SocketHandler handler = new SocketHandler(peer);
+ res.result().handler(handler::handle).closeHandler(handler::close);
+ }
+ });
+
+ return completion;
+ }
+
+ /**
+ * Gossip a message to all known peers.
+ *
+ * @param message the payload to propagate
+ */
+ public void gossip(Bytes message) {
+ if (!started.get()) {
+ throw new IllegalStateException("Server has not started");
+ }
+ state.sendGossipMessage(message);
+ }
+}
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
index ef4cade..3dfb206 100644
--- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java
@@ -21,12 +21,18 @@ import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.crypto.Hash;
import org.apache.tuweni.junit.BouncyCastleExtension;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(BouncyCastleExtension.class)
class StateTest {
+ private static class PeerImpl implements Peer {
+
+ }
+
private static class MockMessageSender implements MessageSender {
Verb verb;
@@ -42,10 +48,12 @@ class StateTest {
}
}
+ private static final AtomicReference<Bytes> messageRef = new AtomicReference<>();
+
@Test
void testInitialState() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
- State state = new State(repo, Hash::keccak256, new MockMessageSender());
+ State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
assertTrue(repo.peers().isEmpty());
assertTrue(repo.lazyPushPeers().isEmpty());
assertTrue(repo.eagerPushPeers().isEmpty());
@@ -54,10 +62,10 @@ class StateTest {
@Test
void firstRoundWithThreePeers() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
- State state = new State(repo, Hash::keccak256, new MockMessageSender());
- state.addPeer(new Peer());
- state.addPeer(new Peer());
- state.addPeer(new Peer());
+ State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
+ state.addPeer(new PeerImpl());
+ state.addPeer(new PeerImpl());
+ state.addPeer(new PeerImpl());
assertTrue(repo.lazyPushPeers().isEmpty());
assertEquals(3, repo.eagerPushPeers().size());
}
@@ -65,8 +73,8 @@ class StateTest {
@Test
void removePeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
- State state = new State(repo, Hash::keccak256, new MockMessageSender());
- Peer peer = new Peer();
+ State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
+ Peer peer = new PeerImpl();
state.addPeer(peer);
state.removePeer(peer);
assertTrue(repo.peers().isEmpty());
@@ -77,8 +85,8 @@ class StateTest {
@Test
void prunePeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
- State state = new State(repo, Hash::keccak256, new MockMessageSender());
- Peer peer = new Peer();
+ State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
+ Peer peer = new PeerImpl();
state.addPeer(peer);
state.receivePruneMessage(peer);
assertEquals(0, repo.eagerPushPeers().size());
@@ -88,8 +96,8 @@ class StateTest {
@Test
void graftPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
- State state = new State(repo, Hash::keccak256, new MockMessageSender());
- Peer peer = new Peer();
+ State state = new State(repo, Hash::keccak256, new MockMessageSender(), messageRef::set, (message, peer) -> true);
+ Peer peer = new PeerImpl();
state.addPeer(peer);
state.receivePruneMessage(peer);
assertEquals(0, repo.eagerPushPeers().size());
@@ -103,13 +111,13 @@ class StateTest {
void receiveFullMessageFromEagerPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender);
- Peer peer = new Peer();
+ State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true);
+ Peer peer = new PeerImpl();
state.addPeer(peer);
- Peer otherPeer = new Peer();
+ Peer otherPeer = new PeerImpl();
state.addPeer(otherPeer);
Bytes32 msg = Bytes32.random();
- state.receiveFullMessage(peer, msg);
+ state.receiveGossipMessage(peer, msg);
assertEquals(msg, messageSender.payload);
assertEquals(otherPeer, messageSender.peer);
}
@@ -118,16 +126,16 @@ class StateTest {
void receiveFullMessageFromEagerPeerWithALazyPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender);
- Peer peer = new Peer();
+ State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true);
+ Peer peer = new PeerImpl();
state.addPeer(peer);
- Peer otherPeer = new Peer();
+ Peer otherPeer = new PeerImpl();
state.addPeer(otherPeer);
Bytes32 msg = Bytes32.random();
- Peer lazyPeer = new Peer();
+ Peer lazyPeer = new PeerImpl();
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
- state.receiveFullMessage(peer, msg);
+ state.receiveGossipMessage(peer, msg);
assertEquals(msg, messageSender.payload);
assertEquals(otherPeer, messageSender.peer);
state.processQueue();
@@ -140,15 +148,15 @@ class StateTest {
void receiveFullMessageFromEagerPeerThenPartialMessageFromLazyPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender);
- Peer peer = new Peer();
+ State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true);
+ Peer peer = new PeerImpl();
state.addPeer(peer);
- Peer lazyPeer = new Peer();
+ Peer lazyPeer = new PeerImpl();
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
Bytes message = Bytes32.random();
- state.receiveFullMessage(peer, message);
- state.receiveHeaderMessage(lazyPeer, message);
+ state.receiveGossipMessage(peer, message);
+ state.receiveIHaveMessage(lazyPeer, message);
assertNull(messageSender.payload);
assertNull(messageSender.peer);
}
@@ -157,14 +165,14 @@ class StateTest {
void receivePartialMessageFromLazyPeerAndNoFullMessage() throws Exception {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender, 100, 4000);
- Peer peer = new Peer();
+ State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, 100, 4000);
+ Peer peer = new PeerImpl();
state.addPeer(peer);
- Peer lazyPeer = new Peer();
+ Peer lazyPeer = new PeerImpl();
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
Bytes message = Bytes32.random();
- state.receiveHeaderMessage(lazyPeer, message);
+ state.receiveIHaveMessage(lazyPeer, message);
Thread.sleep(200);
assertEquals(message, messageSender.payload);
assertEquals(lazyPeer, messageSender.peer);
@@ -175,16 +183,16 @@ class StateTest {
void receivePartialMessageFromLazyPeerAndThenFullMessage() throws Exception {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender, 500, 4000);
- Peer peer = new Peer();
+ State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true, 500, 4000);
+ Peer peer = new PeerImpl();
state.addPeer(peer);
- Peer lazyPeer = new Peer();
+ Peer lazyPeer = new PeerImpl();
state.addPeer(lazyPeer);
repo.moveToLazy(lazyPeer);
Bytes message = Bytes32.random();
- state.receiveHeaderMessage(lazyPeer, Hash.keccak256(message));
+ state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message));
Thread.sleep(100);
- state.receiveFullMessage(peer, message);
+ state.receiveGossipMessage(peer, message);
Thread.sleep(500);
assertNull(messageSender.verb);
assertNull(messageSender.payload);
@@ -195,10 +203,10 @@ class StateTest {
void receiveFullMessageFromUnknownPeer() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender);
- Peer peer = new Peer();
+ State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true);
+ Peer peer = new PeerImpl();
Bytes message = Bytes32.random();
- state.receiveFullMessage(peer, message);
+ state.receiveGossipMessage(peer, message);
assertEquals(1, repo.eagerPushPeers().size());
assertEquals(0, repo.lazyPushPeers().size());
assertEquals(peer, repo.eagerPushPeers().iterator().next());
@@ -208,12 +216,12 @@ class StateTest {
void prunePeerWhenReceivingTwiceTheSameFullMessage() {
EphemeralPeerRepository repo = new EphemeralPeerRepository();
MockMessageSender messageSender = new MockMessageSender();
- State state = new State(repo, Hash::keccak256, messageSender);
- Peer peer = new Peer();
- Peer secondPeer = new Peer();
+ State state = new State(repo, Hash::keccak256, messageSender, messageRef::set, (message, peer) -> true);
+ Peer peer = new PeerImpl();
+ Peer secondPeer = new PeerImpl();
Bytes message = Bytes32.random();
- state.receiveFullMessage(peer, message);
- state.receiveFullMessage(secondPeer, message);
+ state.receiveGossipMessage(peer, message);
+ state.receiveGossipMessage(secondPeer, message);
assertEquals(1, repo.eagerPushPeers().size());
assertEquals(1, repo.lazyPushPeers().size());
assertNull(messageSender.payload);
diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
new file mode 100644
index 0000000..f465afb
--- /dev/null
+++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.tuweni.plumtree.vertx;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.junit.VertxExtension;
+import org.apache.tuweni.junit.VertxInstance;
+import org.apache.tuweni.plumtree.EphemeralPeerRepository;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.vertx.core.Vertx;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(VertxExtension.class)
+class VertxGossipServerTest {
+
+ @Test
+ void gossipDeadBeefToOtherNode(@VertxInstance Vertx vertx) throws Exception {
+
+ AtomicReference<Bytes> messageReceived1 = new AtomicReference<>();
+ AtomicReference<Bytes> messageReceived2 = new AtomicReference<>();
+
+ VertxGossipServer server1 = new VertxGossipServer(
+ vertx,
+ "127.0.0.1",
+ 10000,
+ bytes -> bytes,
+ new EphemeralPeerRepository(),
+ messageReceived1::set,
+ (message, peer) -> true);
+ VertxGossipServer server2 = new VertxGossipServer(
+ vertx,
+ "127.0.0.1",
+ 10001,
+ bytes -> bytes,
+ new EphemeralPeerRepository(),
+ messageReceived2::set,
+ (message, peer) -> true);
+
+ server1.start().join();
+ server2.start().join();
+
+ server1.connectTo("127.0.0.1", 10001).join();
+ server1.gossip(Bytes.fromHexString("deadbeef"));
+ Thread.sleep(1000);
+ assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
+
+ server1.stop().join();
+ server2.stop().join();
+ }
+
+ @Test
+ void gossipDeadBeefToTwoOtherNodes(@VertxInstance Vertx vertx) throws Exception {
+
+ AtomicReference<Bytes> messageReceived1 = new AtomicReference<>();
+ AtomicReference<Bytes> messageReceived2 = new AtomicReference<>();
+ AtomicReference<Bytes> messageReceived3 = new AtomicReference<>();
+
+ VertxGossipServer server1 = new VertxGossipServer(
+ vertx,
+ "127.0.0.1",
+ 10000,
+ bytes -> bytes,
+ new EphemeralPeerRepository(),
+ messageReceived1::set,
+ (message, peer) -> true);
+ VertxGossipServer server2 = new VertxGossipServer(
+ vertx,
+ "127.0.0.1",
+ 10001,
+ bytes -> bytes,
+ new EphemeralPeerRepository(),
+ messageReceived2::set,
+ (message, peer) -> true);
+ VertxGossipServer server3 = new VertxGossipServer(
+ vertx,
+ "127.0.0.1",
+ 10002,
+ bytes -> bytes,
+ new EphemeralPeerRepository(),
+ messageReceived3::set,
+ (message, peer) -> true);
+
+ server1.start().join();
+ server2.start().join();
+ server3.start().join();
+
+ server1.connectTo("127.0.0.1", 10001).join();
+ server3.connectTo("127.0.0.1", 10001).join();
+ server1.gossip(Bytes.fromHexString("deadbeef"));
+ Thread.sleep(1000);
+ assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
+ assertEquals(Bytes.fromHexString("deadbeef"), messageReceived3.get());
+
+ server1.stop().join();
+ server2.stop().join();
+ }
+
+ @Test
+ void gossipCollision(@VertxInstance Vertx vertx) throws Exception {
+ AtomicReference<Bytes> messageReceived1 = new AtomicReference<>();
+ AtomicReference<Bytes> messageReceived2 = new AtomicReference<>();
+
+ EphemeralPeerRepository peerRepository1 = new EphemeralPeerRepository();
+ EphemeralPeerRepository peerRepository3 = new EphemeralPeerRepository();
+
+ VertxGossipServer server1 = new VertxGossipServer(
+ vertx,
+ "127.0.0.1",
+ 10000,
+ bytes -> bytes,
+ peerRepository1,
+ messageReceived1::set,
+ (message, peer) -> true);
+ VertxGossipServer server2 = new VertxGossipServer(
+ vertx,
+ "127.0.0.1",
+ 10001,
+ bytes -> bytes,
+ new EphemeralPeerRepository(),
+ messageReceived2::set,
+ (message, peer) -> true);
+ VertxGossipServer server3 = new VertxGossipServer(
+ vertx,
+ "127.0.0.1",
+ 10002,
+ bytes -> bytes,
+ peerRepository3,
+ messageReceived2::set,
+ (message, peer) -> true);
+
+ server1.start().join();
+ server2.start().join();
+ server3.start().join();
+
+ server1.connectTo("127.0.0.1", 10001).join();
+ server2.connectTo("127.0.0.1", 10002).join();
+ server1.connectTo("127.0.0.1", 10002).join();
+ server1.gossip(Bytes.fromHexString("deadbeef"));
+ Thread.sleep(1000);
+ assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get());
+ Thread.sleep(1000);
+
+ assertTrue(peerRepository1.lazyPushPeers().size() == 1 || peerRepository3.lazyPushPeers().size() == 1);
+
+ server1.stop().join();
+ server2.stop().join();
+ server3.stop().join();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org