You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gossip.apache.org by ec...@apache.org on 2016/10/07 06:05:30 UTC
incubator-gossip git commit: GOSSIP-25 Create reaper process to
expire per-node data
Repository: incubator-gossip
Updated Branches:
refs/heads/master daea6edb1 -> f35dddd8f
GOSSIP-25 Create reaper process to expire per-node data
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/f35dddd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/f35dddd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/f35dddd8
Branch: refs/heads/master
Commit: f35dddd8f21e3e7f9549a742644dba5250374d29
Parents: daea6ed
Author: Edward Capriolo <ed...@gmail.com>
Authored: Fri Oct 7 02:03:43 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Fri Oct 7 02:03:43 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/gossip/GossipService.java | 30 +++++-----
.../java/org/apache/gossip/manager/Clock.java | 8 +++
.../org/apache/gossip/manager/DataReaper.java | 58 ++++++++++++++++++++
.../org/apache/gossip/manager/GossipCore.java | 23 +++++---
.../apache/gossip/manager/GossipManager.java | 31 +++++++++--
.../gossip/manager/PassiveGossipThread.java | 8 +--
.../org/apache/gossip/manager/SystemClock.java | 15 +++++
src/test/java/org/apache/gossip/DataTest.java | 6 +-
.../org/apache/gossip/ShutdownDeadtimeTest.java | 12 ++--
.../org/apache/gossip/StartupSettingsTest.java | 4 +-
.../org/apache/gossip/TenNodeThreeSeedTest.java | 2 +-
.../apache/gossip/manager/DataReaperTest.java | 55 +++++++++++++++++++
12 files changed, 206 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index 6c02e2c..ab0da97 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -24,19 +24,18 @@ import java.util.List;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.random.RandomGossipManager;
-import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.GossipDataMessage;
import org.apache.log4j.Logger;
/**
* This object represents the service which is responsible for gossiping with other gossip members.
*
- * @author joshclemm, harmenw
*/
public class GossipService {
public static final Logger LOGGER = Logger.getLogger(GossipService.class);
- private GossipManager gossipManager;
+ private final GossipManager gossipManager;
/**
* Constructor with the default settings.
@@ -71,7 +70,7 @@ public class GossipService {
}
public void start() {
- LOGGER.debug("Starting: " + get_gossipManager().getMyself().getUri());
+ LOGGER.debug("Starting: " + getGossipManager().getMyself().getUri());
gossipManager.init();
}
@@ -79,25 +78,26 @@ public class GossipService {
gossipManager.shutdown();
}
- public GossipManager get_gossipManager() {
+ public GossipManager getGossipManager() {
return gossipManager;
}
/**
- * Gossip data to the entire cluster
+ * Gossip data in a namespace that is per-node { node-id { key->value } }
* @param message
*/
- public void gossipData(GossipDataMessage message){
- gossipManager.gossipData(message);
+ public void gossipPerNodeData(GossipDataMessage message){
+ gossipManager.gossipPerNodeData(message);
}
-
- public GossipDataMessage findGossipData(String nodeId, String key){
- return this.get_gossipManager().findGossipData(nodeId, key);
- }
-
- public void set_gossipManager(GossipManager _gossipManager) {
- this.gossipManager = _gossipManager;
+ /**
+ * Retrieve per-node gossip data by key
+ * @param nodeId
+ * @param key
+ * @return return the value if found or null if not found or expired
+ */
+ public GossipDataMessage findPerNodeData(String nodeId, String key){
+ return getGossipManager().findGossipData(nodeId, key);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/manager/Clock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/Clock.java b/src/main/java/org/apache/gossip/manager/Clock.java
new file mode 100644
index 0000000..0e828f7
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/Clock.java
@@ -0,0 +1,8 @@
+package org.apache.gossip.manager;
+
+public interface Clock {
+
+ long currentTimeMillis();
+ long nanoTime();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/manager/DataReaper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java b/src/main/java/org/apache/gossip/manager/DataReaper.java
new file mode 100644
index 0000000..237ffb6
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/DataReaper.java
@@ -0,0 +1,58 @@
+package org.apache.gossip.manager;
+
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.model.GossipDataMessage;
+
+/**
+ * We wish to periodically sweep user data and remove entries past their timestamp. This
+ * implementation periodically sweeps through the data and removes old entries. While it might make
+ * sense to use a more specific high performance data-structure to handle eviction, keep in mind
+ * that we are not looking to store a large quantity of data as we currently have to transmit this
+ * data cluster wide.
+ */
+public class DataReaper {
+
+ private final GossipCore gossipCore;
+ private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
+ private final Clock clock;
+
+ public DataReaper(GossipCore gossipCore, Clock clock){
+ this.gossipCore = gossipCore;
+ this.clock = clock;
+ }
+
+ public void init(){
+ Runnable reapPerNodeData = () -> {
+ runOnce();
+ };
+ scheduledExecutor.scheduleAtFixedRate(reapPerNodeData, 0, 5, TimeUnit.SECONDS);
+ }
+
+ void runOnce(){
+ for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> node : gossipCore.getPerNodeData().entrySet()){
+ reapData(node.getValue());
+ }
+ }
+
+ void reapData(ConcurrentHashMap<String, GossipDataMessage> concurrentHashMap){
+ for (Entry<String, GossipDataMessage> entry : concurrentHashMap.entrySet()){
+ if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){
+ concurrentHashMap.remove(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ public void close(){
+ scheduledExecutor.shutdown();
+ try {
+ scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index 46d855a..08ec5b4 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -47,16 +47,21 @@ public class GossipCore {
perNodeData = new ConcurrentHashMap<>();
}
- /**
- *
- * @param message
- */
+
public void addPerNodeData(GossipDataMessage message){
- ConcurrentHashMap<String,GossipDataMessage> m = new ConcurrentHashMap<>();
- m.put(message.getKey(), message);
- m = perNodeData.putIfAbsent(message.getNodeId(), m);
- if (m != null){
- m.put(message.getKey(), message); //TODO only put if > ts
+ ConcurrentHashMap<String,GossipDataMessage> nodeMap = new ConcurrentHashMap<>();
+ nodeMap.put(message.getKey(), message);
+ nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap);
+ if (nodeMap != null){
+ //m.put(message.getKey(), message); //TODO only put if > ts
+ GossipDataMessage current = nodeMap.get(message.getKey());
+ if (current == null){
+ nodeMap.replace(message.getKey(), null, message);
+ } else {
+ if (current.getTimestamp() < message.getTimestamp()){
+ nodeMap.replace(message.getKey(), current, message);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 94b57d1..3c66208 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
@@ -67,14 +68,20 @@ public abstract class GossipManager implements NotificationListener {
private ExecutorService gossipThreadExecutor;
- private GossipCore gossipCore;
+ private final GossipCore gossipCore;
+
+ private final DataReaper dataReaper;
+
+ private final Clock clock;
public GossipManager(String cluster,
URI uri, String id, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener) {
this.settings = settings;
- this.gossipCore = new GossipCore(this);
+ gossipCore = new GossipCore(this);
+ clock = new SystemClock();
+ dataReaper = new DataReaper(gossipCore, clock);
me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this,
settings.getCleanupInterval());
members = new ConcurrentSkipListMap<>();
@@ -192,6 +199,7 @@ public abstract class GossipManager implements NotificationListener {
gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = new ActiveGossipThread(this, this.gossipCore);
activeGossipThread.init();
+ dataReaper.init();
GossipService.LOGGER.debug("The GossipService is started.");
}
@@ -202,6 +210,7 @@ public abstract class GossipManager implements NotificationListener {
gossipServiceRunning.set(false);
gossipThreadExecutor.shutdown();
gossipCore.shutdown();
+ dataReaper.close();
if (passiveGossipThread != null) {
passiveGossipThread.shutdown();
}
@@ -218,7 +227,10 @@ public abstract class GossipManager implements NotificationListener {
}
}
- public void gossipData(GossipDataMessage message){
+ public void gossipPerNodeData(GossipDataMessage message){
+ Objects.nonNull(message.getKey());
+ Objects.nonNull(message.getTimestamp());
+ Objects.nonNull(message.getPayload());
message.setNodeId(me.getId());
gossipCore.addPerNodeData(message);
}
@@ -228,8 +240,19 @@ public abstract class GossipManager implements NotificationListener {
if (j == null){
return null;
} else {
- return j.get(key);
+ GossipDataMessage l = j.get(key);
+ if (l == null){
+ return null;
+ }
+ if (l.getExpireAt() != null && l.getExpireAt() < clock.currentTimeMillis()) {
+ return null;
+ }
+ return l;
}
}
+
+ public DataReaper getDataReaper() {
+ return dataReaper;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index 11c371e..f2dce0b 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -23,15 +23,11 @@ import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
-import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.gossip.GossipMember;
-import org.apache.gossip.GossipService;
import org.apache.gossip.model.Base;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
-import org.apache.gossip.RemoteGossipMember;
/**
* [The passive thread: reply to incoming gossip request.] This class handles the passive cycle,
@@ -107,9 +103,9 @@ abstract public class PassiveGossipThread implements Runnable {
}
private void debug(int packetLength, byte[] jsonBytes) {
- if (GossipService.LOGGER.isDebugEnabled()){
+ if (LOGGER.isDebugEnabled()){
String receivedMessage = new String(jsonBytes);
- GossipService.LOGGER.debug("Received message (" + packetLength + " bytes): "
+ LOGGER.debug("Received message (" + packetLength + " bytes): "
+ receivedMessage);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/main/java/org/apache/gossip/manager/SystemClock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/SystemClock.java b/src/main/java/org/apache/gossip/manager/SystemClock.java
new file mode 100644
index 0000000..6d113b7
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/SystemClock.java
@@ -0,0 +1,15 @@
+package org.apache.gossip.manager;
+
+public class SystemClock implements Clock {
+
+ @Override
+ public long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public long nanoTime() {
+ return System.nanoTime();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/test/java/org/apache/gossip/DataTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index 6260f9b..02c89a8 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -46,17 +46,17 @@ public class DataTest {
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).get_gossipManager().getLiveMembers().size();
+ total += clients.get(i).getGossipManager().getLiveMembers().size();
}
return total;
}}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
- clients.get(0).gossipData(msg());
+ clients.get(0).gossipPerNodeData(msg());
Thread.sleep(10000);
TUnit.assertThat(
new Callable<Object> (){
public Object call() throws Exception {
- GossipDataMessage x = clients.get(1).findGossipData(1+"" , "a");
+ GossipDataMessage x = clients.get(1).findPerNodeData(1+"" , "a");
if (x == null) return "";
else return x.getPayload();
}})
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index 82cb625..383657d 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -78,7 +78,7 @@ public class ShutdownDeadtimeTest {
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).get_gossipManager().getLiveMembers().size();
+ total += clients.get(i).getGossipManager().getLiveMembers().size();
}
return total;
}
@@ -88,15 +88,15 @@ public class ShutdownDeadtimeTest {
Random r = new Random();
int randomClientId = r.nextInt(clusterMembers);
log.info("shutting down " + randomClientId);
- final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri()
+ final int shutdownPort = clients.get(randomClientId).getGossipManager().getMyself().getUri()
.getPort();
- final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId();
+ final String shutdownId = clients.get(randomClientId).getGossipManager().getMyself().getId();
clients.get(randomClientId).shutdown();
TUnit.assertThat(new Callable<Integer>() {
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).get_gossipManager().getLiveMembers().size();
+ total += clients.get(i).getGossipManager().getLiveMembers().size();
}
return total;
}
@@ -107,7 +107,7 @@ public class ShutdownDeadtimeTest {
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < clusterMembers - 1; ++i) {
- total += clients.get(i).get_gossipManager().getDeadList().size();
+ total += clients.get(i).getGossipManager().getDeadList().size();
}
return total;
}
@@ -130,7 +130,7 @@ public class ShutdownDeadtimeTest {
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).get_gossipManager().getLiveMembers().size();
+ total += clients.get(i).getGossipManager().getLiveMembers().size();
}
return total;
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/test/java/org/apache/gossip/StartupSettingsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java
index 3a52fc7..3b62836 100644
--- a/src/test/java/org/apache/gossip/StartupSettingsTest.java
+++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java
@@ -62,7 +62,7 @@ public class StartupSettingsTest {
TUnit.assertThat(new Callable<Integer> (){
public Integer call() throws Exception {
- return firstService.get_gossipManager().getLiveMembers().size();
+ return firstService.getGossipManager().getLiveMembers().size();
}}).afterWaitingAtMost(30, TimeUnit.SECONDS).isEqualTo(0);
final GossipService serviceUnderTest = new GossipService(
StartupSettings.fromJSONFile( settingsFile )
@@ -70,7 +70,7 @@ public class StartupSettingsTest {
serviceUnderTest.start();
TUnit.assertThat(new Callable<Integer> (){
public Integer call() throws Exception {
- return serviceUnderTest.get_gossipManager().getLiveMembers().size();
+ return serviceUnderTest.getGossipManager().getLiveMembers().size();
}}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(1);
firstService.shutdown();
serviceUnderTest.shutdown();
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index 0faa968..e72ec67 100644
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -82,7 +82,7 @@ public class TenNodeThreeSeedTest {
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).get_gossipManager().getLiveMembers().size();
+ total += clients.get(i).getGossipManager().getLiveMembers().size();
}
return total;
}}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/f35dddd8/src/test/java/org/apache/gossip/manager/DataReaperTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
new file mode 100644
index 0000000..4cd5dfe
--- /dev/null
+++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
@@ -0,0 +1,55 @@
+package org.apache.gossip.manager;
+
+import java.net.URI;
+
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.manager.random.RandomGossipManager;
+import org.apache.gossip.model.GossipDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.teknek.tunit.TUnit;
+
+public class DataReaperTest {
+
+ @Test
+ public void testReaperOneShot() {
+ String myId = "4";
+ String key = "key";
+ String value = "a";
+ GossipSettings settings = new GossipSettings();
+ GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
+ .withId(myId).uri(URI.create("udp://localhost:5000")).build();
+ gm.gossipPerNodeData(perNodeDatum(key, value));
+ Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload());
+ gm.getDataReaper().runOnce();
+ TUnit.assertThat(() -> gm.findGossipData(myId, key)).equals(null);
+ }
+
+ private GossipDataMessage perNodeDatum(String key, String value) {
+ GossipDataMessage m = new GossipDataMessage();
+ m.setExpireAt(System.currentTimeMillis() + 5L);
+ m.setKey(key);
+ m.setPayload(value);
+ m.setTimestamp(System.currentTimeMillis());
+ return m;
+ }
+
+ @Test
+ public void testHigherTimestampWins() {
+ String myId = "4";
+ String key = "key";
+ String value = "a";
+ GossipSettings settings = new GossipSettings();
+ GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
+ .withId(myId).uri(URI.create("udp://localhost:5000")).build();
+ GossipDataMessage before = perNodeDatum(key, value);
+ GossipDataMessage after = perNodeDatum(key, "b");
+ after.setTimestamp(after.getTimestamp() - 1);
+ gm.gossipPerNodeData(before);
+ Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload());
+ gm.gossipPerNodeData(after);
+ Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload());
+ }
+
+}