You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gossip.apache.org by ec...@apache.org on 2016/10/07 07:08:08 UTC
incubator-gossip git commit: GOSSIP-26 Gossip shared data
Repository: incubator-gossip
Updated Branches:
refs/heads/master f35dddd8f -> 201b101a9
GOSSIP-26 Gossip shared data
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/201b101a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/201b101a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/201b101a
Branch: refs/heads/master
Commit: 201b101a91cf02d4ef2b0d9536cf0ceda99f6115
Parents: f35dddd
Author: Edward Capriolo <ed...@gmail.com>
Authored: Fri Oct 7 03:04:59 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Fri Oct 7 03:04:59 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/gossip/GossipService.java | 21 ++++++++-
.../gossip/manager/ActiveGossipThread.java | 42 +++++++++++++++--
.../org/apache/gossip/manager/DataReaper.java | 14 +++++-
.../org/apache/gossip/manager/GossipCore.java | 25 ++++++++++-
.../apache/gossip/manager/GossipManager.java | 23 +++++++++-
src/main/java/org/apache/gossip/model/Base.java | 5 ++-
.../gossip/model/SharedGossipDataMessage.java | 47 ++++++++++++++++++++
.../gossip/udp/UdpSharedGossipDataMessage.java | 31 +++++++++++++
src/test/java/org/apache/gossip/DataTest.java | 47 +++++++++++++++-----
.../apache/gossip/manager/DataReaperTest.java | 25 ++++++++---
10 files changed, 252 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index ab0da97..e50f260 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -24,7 +24,8 @@ import java.util.List;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.random.RandomGossipManager;
-import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.log4j.Logger;
/**
@@ -97,7 +98,23 @@ public class GossipService {
* @return return the value if found or null if not found or expired
*/
public GossipDataMessage findPerNodeData(String nodeId, String key){
- return getGossipManager().findGossipData(nodeId, key);
+ return getGossipManager().findPerNodeGossipData(nodeId, key);
}
+ /**
+ * Gossip shared data
+ * @param message
+ */
+ public void gossipSharedData(SharedGossipDataMessage message){
+ gossipManager.gossipSharedData(message);
+ }
+
+ /**
+ *
+ * @param key the key to search for
+ * @return
+ */
+ public SharedGossipDataMessage findSharedData(String key){
+ return getGossipManager().findSharedGossipData(key);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index 19caffe..28de244 100644
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -34,9 +34,10 @@ import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.GossipMember;
import org.apache.gossip.model.Response;
+import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpGossipDataMessage;
-
+import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
@@ -70,7 +71,10 @@ public class ActiveGossipThread {
() -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
- () -> sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
+ () -> sendPerNodeData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
+ gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+ scheduledExecutorService.scheduleAtFixedRate(
+ () -> sendSharedData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
}
@@ -83,7 +87,39 @@ public class ActiveGossipThread {
}
}
- public void sendData(LocalGossipMember me, List<LocalGossipMember> memberList){
+ public void sendSharedData(LocalGossipMember me, List<LocalGossipMember> memberList){
+ LocalGossipMember member = selectPartner(memberList);
+ if (member == null) {
+ LOGGER.debug("Send sendMembershipList() is called without action");
+ return;
+ }
+ try (DatagramSocket socket = new DatagramSocket()) {
+ socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
+ for (Entry<String, SharedGossipDataMessage> innerEntry : this.gossipCore.getSharedData().entrySet()){
+ UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
+ message.setUuid(UUID.randomUUID().toString());
+ message.setUriFrom(me.getId());
+ message.setExpireAt(innerEntry.getValue().getExpireAt());
+ message.setKey(innerEntry.getValue().getKey());
+ message.setNodeId(innerEntry.getValue().getNodeId());
+ message.setTimestamp(innerEntry.getValue().getTimestamp());
+ message.setPayload(innerEntry.getValue().getPayload());
+ message.setTimestamp(innerEntry.getValue().getTimestamp());
+ byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
+ int packet_length = json_bytes.length;
+ if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+ gossipCore.sendOneWay(message, member.getUri());
+ } else {
+ LOGGER.error("The length of the to be send message is too large ("
+ + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
+ }
+ }
+ } catch (IOException e1) {
+ LOGGER.warn(e1);
+ }
+ }
+
+ public void sendPerNodeData(LocalGossipMember me, List<LocalGossipMember> memberList){
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/manager/DataReaper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java b/src/main/java/org/apache/gossip/manager/DataReaper.java
index 237ffb6..4f4616b 100644
--- a/src/main/java/org/apache/gossip/manager/DataReaper.java
+++ b/src/main/java/org/apache/gossip/manager/DataReaper.java
@@ -7,6 +7,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
/**
* We wish to periodically sweep user data and remove entries past their timestamp. This
@@ -28,12 +29,21 @@ public class DataReaper {
public void init(){
Runnable reapPerNodeData = () -> {
- runOnce();
+ runPerNodeOnce();
+ runSharedOnce();
};
scheduledExecutor.scheduleAtFixedRate(reapPerNodeData, 0, 5, TimeUnit.SECONDS);
}
- void runOnce(){
+ void runSharedOnce(){
+ for (Entry<String, SharedGossipDataMessage> entry : gossipCore.getSharedData().entrySet()){
+ if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){
+ gossipCore.getSharedData().remove(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ void runPerNodeOnce(){
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> node : gossipCore.getPerNodeData().entrySet()){
reapData(node.getValue());
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index 08ec5b4..6dc4a5c 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -23,11 +23,13 @@ import org.apache.gossip.model.ActiveGossipMessage;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.Response;
+import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.udp.Trackable;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpNotAMemberFault;
+import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
@@ -39,24 +41,35 @@ public class GossipCore {
private ConcurrentHashMap<String, Base> requests;
private ExecutorService service;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData;
+ private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData;
public GossipCore(GossipManager manager){
this.gossipManager = manager;
requests = new ConcurrentHashMap<>();
service = Executors.newFixedThreadPool(500);
perNodeData = new ConcurrentHashMap<>();
+ sharedData = new ConcurrentHashMap<>();
}
+ public void addSharedData(SharedGossipDataMessage message){
+ SharedGossipDataMessage previous = sharedData.get(message.getKey());
+ if (previous == null){
+ sharedData.putIfAbsent(message.getKey(), message);
+ } else {
+ if (previous.getTimestamp() < message.getTimestamp()){
+ sharedData.replace(message.getKey(), previous, message);
+ }
+ }
+ }
public void addPerNodeData(GossipDataMessage message){
ConcurrentHashMap<String,GossipDataMessage> nodeMap = new ConcurrentHashMap<>();
nodeMap.put(message.getKey(), message);
nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap);
if (nodeMap != null){
- //m.put(message.getKey(), message); //TODO only put if > ts
GossipDataMessage current = nodeMap.get(message.getKey());
if (current == null){
- nodeMap.replace(message.getKey(), null, message);
+ nodeMap.putIfAbsent(message.getKey(), message);
} else {
if (current.getTimestamp() < message.getTimestamp()){
nodeMap.replace(message.getKey(), current, message);
@@ -69,6 +82,10 @@ public class GossipCore {
return perNodeData;
}
+ public ConcurrentHashMap<String, SharedGossipDataMessage> getSharedData() {
+ return sharedData;
+ }
+
public void shutdown(){
service.shutdown();
try {
@@ -89,6 +106,10 @@ public class GossipCore {
UdpGossipDataMessage message = (UdpGossipDataMessage) base;
addPerNodeData(message);
}
+ if (base instanceof SharedGossipDataMessage){
+ UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base;
+ addSharedData(message);
+ }
if (base instanceof ActiveGossipMessage){
List<GossipMember> remoteGossipMembers = new ArrayList<>();
RemoteGossipMember senderMember = null;
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 3c66208..9f75fe3 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -44,6 +44,7 @@ import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
public abstract class GossipManager implements NotificationListener {
@@ -235,7 +236,15 @@ public abstract class GossipManager implements NotificationListener {
gossipCore.addPerNodeData(message);
}
- public GossipDataMessage findGossipData(String nodeId, String key){
+ public void gossipSharedData(SharedGossipDataMessage message){
+ Objects.nonNull(message.getKey());
+ Objects.nonNull(message.getTimestamp());
+ Objects.nonNull(message.getPayload());
+ message.setNodeId(me.getId());
+ gossipCore.addSharedData(message);
+ }
+
+ public GossipDataMessage findPerNodeGossipData(String nodeId, String key){
ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
if (j == null){
return null;
@@ -250,6 +259,18 @@ public abstract class GossipManager implements NotificationListener {
return l;
}
}
+
+ public SharedGossipDataMessage findSharedGossipData(String key){
+ SharedGossipDataMessage l = gossipCore.getSharedData().get(key);
+ if (l == null){
+ return null;
+ }
+ if (l.getExpireAt() < clock.currentTimeMillis()){
+ return null;
+ } else {
+ return l;
+ }
+ }
public DataReaper getDataReaper() {
return dataReaper;
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/model/Base.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/model/Base.java b/src/main/java/org/apache/gossip/model/Base.java
index 66c2be6..2bbb7af 100644
--- a/src/main/java/org/apache/gossip/model/Base.java
+++ b/src/main/java/org/apache/gossip/model/Base.java
@@ -4,6 +4,7 @@ import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpNotAMemberFault;
+import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonSubTypes.Type;
import org.codehaus.jackson.annotate.JsonTypeInfo;
@@ -20,7 +21,9 @@ import org.codehaus.jackson.annotate.JsonTypeInfo;
@Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"),
@Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"),
@Type(value = GossipDataMessage.class, name = "GossipDataMessage"),
- @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage")
+ @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage"),
+ @Type(value = SharedGossipDataMessage.class, name = "SharedGossipDataMessage"),
+ @Type(value = UdpSharedGossipDataMessage.class, name = "UdpSharedGossipDataMessage")
})
public class Base {
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java b/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java
new file mode 100644
index 0000000..bac9ddf
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java
@@ -0,0 +1,47 @@
+package org.apache.gossip.model;
+
+public class SharedGossipDataMessage extends Base {
+
+ private String nodeId;
+ private String key;
+ private Object payload;
+ private Long timestamp;
+ private Long expireAt;
+
+ public String getNodeId() {
+ return nodeId;
+ }
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+ public String getKey() {
+ return key;
+ }
+ public void setKey(String key) {
+ this.key = key;
+ }
+ public Object getPayload() {
+ return payload;
+ }
+ public void setPayload(Object payload) {
+ this.payload = payload;
+ }
+ public Long getTimestamp() {
+ return timestamp;
+ }
+ public void setTimestamp(Long timestamp) {
+ this.timestamp = timestamp;
+ }
+ public Long getExpireAt() {
+ return expireAt;
+ }
+ public void setExpireAt(Long expireAt) {
+ this.expireAt = expireAt;
+ }
+ @Override
+ public String toString() {
+ return "SharedGossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload
+ + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]";
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java b/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java
new file mode 100644
index 0000000..cb99759
--- /dev/null
+++ b/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java
@@ -0,0 +1,31 @@
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.SharedGossipDataMessage;
+
+public class UdpSharedGossipDataMessage extends SharedGossipDataMessage implements Trackable {
+
+ private String uriFrom;
+ private String uuid;
+
+ public String getUriFrom() {
+ return uriFrom;
+ }
+
+ public void setUriFrom(String uriFrom) {
+ this.uriFrom = uriFrom;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override
+ public String toString() {
+ return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/test/java/org/apache/gossip/DataTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index 02c89a8..4909bf8 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
import org.junit.Test;
import io.teknek.tunit.TUnit;
@@ -19,7 +20,7 @@ import io.teknek.tunit.TUnit;
public class DataTest {
@Test
- public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{
+ public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
GossipSettings settings = new GossipSettings();
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
@@ -51,20 +52,32 @@ public class DataTest {
return total;
}}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
clients.get(0).gossipPerNodeData(msg());
+ clients.get(0).gossipSharedData(sharedMsg());
Thread.sleep(10000);
TUnit.assertThat(
-
- new Callable<Object> (){
+ new Callable<Object>() {
public Object call() throws Exception {
- GossipDataMessage x = clients.get(1).findPerNodeData(1+"" , "a");
- if (x == null) return "";
- else return x.getPayload();
- }})
-
-
- //() -> clients.get(1).findGossipData(1+"" , "a").getPayload())
- .afterWaitingAtMost(20, TimeUnit.SECONDS)
- .isEqualTo("b");
+ GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a");
+ if (x == null)
+ return "";
+ else
+ return x.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
+
+
+ TUnit.assertThat(
+ new Callable<Object>() {
+ public Object call() throws Exception {
+ SharedGossipDataMessage x = clients.get(1).findSharedData("a");
+ if (x == null)
+ return "";
+ else
+ return x.getPayload();
+ }
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
+
+
for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown();
}
@@ -78,4 +91,14 @@ public class DataTest {
g.setTimestamp(System.currentTimeMillis());
return g;
}
+
+ private SharedGossipDataMessage sharedMsg(){
+ SharedGossipDataMessage g = new SharedGossipDataMessage();
+ g.setExpireAt(Long.MAX_VALUE);
+ g.setKey("a");
+ g.setPayload("c");
+ g.setTimestamp(System.currentTimeMillis());
+ return g;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/test/java/org/apache/gossip/manager/DataReaperTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
index 4cd5dfe..d0164b1 100644
--- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java
+++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
@@ -5,6 +5,7 @@ import java.net.URI;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.manager.random.RandomGossipManager;
import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
import org.junit.Assert;
import org.junit.Test;
@@ -21,9 +22,13 @@ public class DataReaperTest {
GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
.withId(myId).uri(URI.create("udp://localhost:5000")).build();
gm.gossipPerNodeData(perNodeDatum(key, value));
- Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload());
- gm.getDataReaper().runOnce();
- TUnit.assertThat(() -> gm.findGossipData(myId, key)).equals(null);
+ gm.gossipSharedData(sharedDatum(key, value));
+ Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
+ Assert.assertEquals(value, gm.findSharedGossipData(key).getPayload());
+ gm.getDataReaper().runPerNodeOnce();
+ gm.getDataReaper().runSharedOnce();
+ TUnit.assertThat(() -> gm.findPerNodeGossipData(myId, key)).equals(null);
+ TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null);
}
private GossipDataMessage perNodeDatum(String key, String value) {
@@ -34,6 +39,16 @@ public class DataReaperTest {
m.setTimestamp(System.currentTimeMillis());
return m;
}
+
+ private SharedGossipDataMessage sharedDatum(String key, String value) {
+ SharedGossipDataMessage m = new SharedGossipDataMessage();
+ m.setExpireAt(System.currentTimeMillis() + 5L);
+ m.setKey(key);
+ m.setPayload(value);
+ m.setTimestamp(System.currentTimeMillis());
+ return m;
+ }
+
@Test
public void testHigherTimestampWins() {
@@ -47,9 +62,9 @@ public class DataReaperTest {
GossipDataMessage after = perNodeDatum(key, "b");
after.setTimestamp(after.getTimestamp() - 1);
gm.gossipPerNodeData(before);
- Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload());
+ Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
gm.gossipPerNodeData(after);
- Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload());
+ Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
}
}