You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gossip.apache.org by ec...@apache.org on 2016/06/11 00:16:38 UTC
[01/12] incubator-gossip git commit: Move to URI in model and
configuration
Repository: incubator-gossip
Updated Branches:
refs/heads/master fe196cd78 -> 2c1dc4375
Move to URI in model and configuration
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/5532585e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/5532585e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/5532585e
Branch: refs/heads/master
Commit: 5532585e67feb06bb97ecd0afabf97c30403a99b
Parents: 3ca8e0f
Author: Edward Capriolo <ed...@gmail.com>
Authored: Thu Jun 2 09:23:55 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Thu Jun 2 09:23:55 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/gossip/GossipMember.java | 35 ++++---------
.../java/org/apache/gossip/GossipRunner.java | 5 +-
.../java/org/apache/gossip/GossipService.java | 14 +++---
.../org/apache/gossip/LocalGossipMember.java | 13 +++--
.../org/apache/gossip/RemoteGossipMember.java | 19 +++----
.../java/org/apache/gossip/StartupSettings.java | 53 +++++++++-----------
.../apache/gossip/examples/GossipExample.java | 16 +++---
.../apache/gossip/manager/GossipManager.java | 11 ++--
.../gossip/manager/PassiveGossipThread.java | 33 +++++++-----
.../OnlyProcessReceivedPassiveGossipThread.java | 33 ++++++------
.../impl/SendMembersActiveGossipThread.java | 7 ++-
.../manager/random/RandomGossipManager.java | 5 +-
.../org/apache/gossip/model/GossipMember.java | 28 ++++-------
.../io/teknek/gossip/ShutdownDeadtimeTest.java | 15 ++++--
.../io/teknek/gossip/StartupSettingsTest.java | 11 ++--
.../io/teknek/gossip/TenNodeThreeSeedTest.java | 17 ++++---
16 files changed, 153 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/main/java/org/apache/gossip/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipMember.java b/src/main/java/org/apache/gossip/GossipMember.java
index fd44ddd..dbc84b2 100644
--- a/src/main/java/org/apache/gossip/GossipMember.java
+++ b/src/main/java/org/apache/gossip/GossipMember.java
@@ -18,6 +18,7 @@
package org.apache.gossip;
import java.net.InetSocketAddress;
+import java.net.URI;
/**
* A abstract class representing a gossip member.
@@ -27,9 +28,7 @@ import java.net.InetSocketAddress;
public abstract class GossipMember implements Comparable<GossipMember> {
- protected final String host;
-
- protected final int port;
+ protected final URI uri;
protected volatile long heartbeat;
@@ -54,12 +53,11 @@ public abstract class GossipMember implements Comparable<GossipMember> {
* @param id
* an id that may be replaced after contact
*/
- public GossipMember(String clusterName, String host, int port, String id, long heartbeat) {
+ public GossipMember(String clusterName, URI uri, String id, long heartbeat) {
this.clusterName = clusterName;
- this.host = host;
- this.port = port;
this.id = id;
this.heartbeat = heartbeat;
+ this.uri = uri;
}
/**
@@ -71,30 +69,13 @@ public abstract class GossipMember implements Comparable<GossipMember> {
return clusterName;
}
- /**
- * Get the hostname or IP address of the remote gossip member.
- *
- * @return The hostname or IP address.
- */
- public String getHost() {
- return host;
- }
-
- /**
- * Get the port number of the remote gossip member.
- *
- * @return The port number.
- */
- public int getPort() {
- return port;
- }
-
+
/**
* The member address in the form IP/host:port Similar to the toString in
* {@link InetSocketAddress}
*/
public String getAddress() {
- return host + ":" + port;
+ return uri.getHost() + ":" + uri.getPort();
}
/**
@@ -141,6 +122,10 @@ public abstract class GossipMember implements Comparable<GossipMember> {
return result;
}
+ public URI getUri() {
+ return uri;
+ }
+
/**
* @see java.lang.Object#equals(java.lang.Object)
*/
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/main/java/org/apache/gossip/GossipRunner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipRunner.java b/src/main/java/org/apache/gossip/GossipRunner.java
index d995cce..c765ed6 100644
--- a/src/main/java/org/apache/gossip/GossipRunner.java
+++ b/src/main/java/org/apache/gossip/GossipRunner.java
@@ -20,12 +20,13 @@ package org.apache.gossip;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.URISyntaxException;
import org.json.JSONException;
public class GossipRunner {
- public static void main(String[] args) {
+ public static void main(String[] args) throws URISyntaxException {
File configFile;
if (args.length == 1) {
configFile = new File("./" + args[0]);
@@ -35,7 +36,7 @@ public class GossipRunner {
new GossipRunner(configFile);
}
- public GossipRunner(File configFile) {
+ public GossipRunner(File configFile) throws URISyntaxException {
if (configFile != null && configFile.exists()) {
try {
System.out.println("Parsing the configuration file...");
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index 9db740e..3175706 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -18,6 +18,7 @@
package org.apache.gossip;
import java.net.InetAddress;
+import java.net.URI;
import java.net.UnknownHostException;
import java.util.List;
@@ -45,8 +46,8 @@ public class GossipService {
*/
public GossipService(StartupSettings startupSettings) throws InterruptedException,
UnknownHostException {
- this(startupSettings.getCluster(), InetAddress.getLocalHost().getHostAddress(), startupSettings
- .getPort(), startupSettings.getId(), startupSettings.getGossipMembers(),
+ this(startupSettings.getCluster(), startupSettings.getUri()
+ , startupSettings.getId(), startupSettings.getGossipMembers(),
startupSettings.getGossipSettings(), null);
}
@@ -56,18 +57,15 @@ public class GossipService {
* @throws InterruptedException
* @throws UnknownHostException
*/
- public GossipService(String cluster, String ipAddress, int port, String id,
+ public GossipService(String cluster, URI uri, String id,
List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
throws InterruptedException, UnknownHostException {
- gossipManager = new RandomGossipManager(cluster, ipAddress, port, id, settings, gossipMembers,
+ gossipManager = new RandomGossipManager(cluster, uri, id, settings, gossipMembers,
listener);
}
public void start() {
- String address = get_gossipManager().getMyself().getHost() + ":"
- + get_gossipManager().getMyself().getPort();
- LOGGER.debug("Starting: " + gossipManager.getName() + " - " + address);
-
+ LOGGER.debug("Starting: " + gossipManager.getName() + " - " + get_gossipManager().getMyself().getUri());
gossipManager.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/main/java/org/apache/gossip/LocalGossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java
index 55ce257..d7e9f4e 100644
--- a/src/main/java/org/apache/gossip/LocalGossipMember.java
+++ b/src/main/java/org/apache/gossip/LocalGossipMember.java
@@ -17,6 +17,8 @@
*/
package org.apache.gossip;
+import java.net.URI;
+
import javax.management.NotificationListener;
/**
@@ -32,10 +34,8 @@ public class LocalGossipMember extends GossipMember {
/**
* Constructor.
*
- * @param hostname
- * The hostname or IP address.
- * @param port
- * The port number.
+ * @param uri
+ * The uri of the member
* @param id
* @param heartbeat
* The current heartbeat.
@@ -43,10 +43,9 @@ public class LocalGossipMember extends GossipMember {
* @param cleanupTimeout
* The cleanup timeout for this gossip member.
*/
- public LocalGossipMember(String clusterName, String hostname, int port, String id,
+ public LocalGossipMember(String clusterName, URI uri, String id,
long heartbeat, NotificationListener notificationListener, int cleanupTimeout) {
- super(clusterName, hostname, port, id, heartbeat);
-
+ super(clusterName, uri, id, heartbeat);
timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/main/java/org/apache/gossip/RemoteGossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/RemoteGossipMember.java b/src/main/java/org/apache/gossip/RemoteGossipMember.java
index 899da93..88c568a 100644
--- a/src/main/java/org/apache/gossip/RemoteGossipMember.java
+++ b/src/main/java/org/apache/gossip/RemoteGossipMember.java
@@ -17,6 +17,8 @@
*/
package org.apache.gossip;
+import java.net.URI;
+
/**
* The object represents a gossip member with the properties as received from a remote gossip
* member.
@@ -35,19 +37,12 @@ public class RemoteGossipMember extends GossipMember {
* @param heartbeat
* The current heartbeat.
*/
- public RemoteGossipMember(String clusterName, String hostname, int port, String id, long heartbeat) {
- super(clusterName, hostname, port, id, heartbeat);
+ public RemoteGossipMember(String clusterName, URI uri, String id, long heartbeat) {
+ super(clusterName, uri, id, heartbeat);
}
- /**
- * Construct a RemoteGossipMember with a heartbeat of 0.
- *
- * @param hostname
- * The hostname or IP address.
- * @param port
- * The port number.
- */
- public RemoteGossipMember(String clusterName, String hostname, int port, String id) {
- super(clusterName, hostname, port, id, System.currentTimeMillis());
+ public RemoteGossipMember(String clusterName, URI uri, String id) {
+ super(clusterName, uri, id, System.currentTimeMillis());
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/main/java/org/apache/gossip/StartupSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java
index 176a79b..9475536 100644
--- a/src/main/java/org/apache/gossip/StartupSettings.java
+++ b/src/main/java/org/apache/gossip/StartupSettings.java
@@ -22,6 +22,8 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
@@ -41,9 +43,8 @@ public class StartupSettings {
/** The id to use fo the service */
private String id;
- /** The port to start the gossip service on. */
- private int port;
-
+ private URI uri;
+
private String cluster;
/** The gossip settings used at startup. */
@@ -62,8 +63,16 @@ public class StartupSettings {
* @param logLevel
* unused
*/
- public StartupSettings(String id, int port, int logLevel, String cluster) {
- this(id, port, new GossipSettings(), cluster);
+ public StartupSettings(String id, URI uri, int logLevel, String cluster) {
+ this(id, uri, new GossipSettings(), cluster);
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+ public void setUri(URI uri) {
+ this.uri = uri;
}
/**
@@ -74,9 +83,9 @@ public class StartupSettings {
* @param port
* The port to start the service on.
*/
- public StartupSettings(String id, int port, GossipSettings gossipSettings, String cluster) {
+ public StartupSettings(String id, URI uri, GossipSettings gossipSettings, String cluster) {
this.id = id;
- this.port = port;
+ this.uri = uri;
this.gossipSettings = gossipSettings;
this.setCluster(cluster);
gossipMembers = new ArrayList<>();
@@ -110,25 +119,6 @@ public class StartupSettings {
}
/**
- * Set the port of the gossip service.
- *
- * @param port
- * The port for the gossip service.
- */
- public void setPort(int port) {
- this.port = port;
- }
-
- /**
- * Get the port for the gossip service.
- *
- * @return The port of the gossip service.
- */
- public int getPort() {
- return port;
- }
-
- /**
* Get the GossipSettings.
*
* @return The GossipSettings object.
@@ -168,9 +158,10 @@ public class StartupSettings {
* Thrown when the file cannot be found.
* @throws IOException
* Thrown when reading the file gives problems.
+ * @throws URISyntaxException
*/
public static StartupSettings fromJSONFile(File jsonFile) throws JSONException,
- FileNotFoundException, IOException {
+ FileNotFoundException, IOException, URISyntaxException {
// Read the file to a String.
StringBuffer buffer = new StringBuffer();
try (BufferedReader br = new BufferedReader(new FileReader(jsonFile)) ){
@@ -181,7 +172,7 @@ public class StartupSettings {
}
JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0);
- int port = jsonObject.getInt("port");
+ String uri = jsonObject.getString("uri");
String id = jsonObject.getString("id");
int gossipInterval = jsonObject.getInt("gossip_interval");
int cleanupInterval = jsonObject.getInt("cleanup_interval");
@@ -189,7 +180,8 @@ public class StartupSettings {
if (cluster == null){
throw new IllegalArgumentException("cluster was null. It is required");
}
- StartupSettings settings = new StartupSettings(id, port, new GossipSettings(gossipInterval,
+ URI uri2 = new URI(uri);
+ StartupSettings settings = new StartupSettings(id, uri2, new GossipSettings(gossipInterval,
cleanupInterval), cluster);
// Now iterate over the members from the config file and add them to the settings.
@@ -197,8 +189,9 @@ public class StartupSettings {
JSONArray membersJSON = jsonObject.getJSONArray("members");
for (int i = 0; i < membersJSON.length(); i++) {
JSONObject memberJSON = membersJSON.getJSONObject(i);
+ URI uri3 = new URI(memberJSON.getString("uri"));
RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("cluster"),
- memberJSON.getString("host"), memberJSON.getInt("port"), "");
+ uri3, "", 0);
settings.addGossipMember(member);
configMembersDetails += member.getAddress();
if (i < (membersJSON.length() - 1))
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/main/java/org/apache/gossip/examples/GossipExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java
index e953c77..cea59f4 100644
--- a/src/main/java/org/apache/gossip/examples/GossipExample.java
+++ b/src/main/java/org/apache/gossip/examples/GossipExample.java
@@ -18,6 +18,8 @@
package org.apache.gossip.examples;
import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -57,26 +59,28 @@ public class GossipExample extends Thread {
public void run() {
try {
GossipSettings settings = new GossipSettings();
-
List<GossipService> clients = new ArrayList<>();
-
- // Get my ip address.
String myIpAddress = InetAddress.getLocalHost().getHostAddress();
-
String cluster = "My Gossip Cluster";
// Create the gossip members and put them in a list and give them a port number starting with
// 2000.
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
- startupMembers.add(new RemoteGossipMember(cluster, myIpAddress, 2000 + i, ""));
+ URI u;
+ try {
+ u = new URI("udp://" + myIpAddress + ":" + (2000 + i));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ startupMembers.add(new RemoteGossipMember(cluster, u, "", 0 ));
}
// Lets start the gossip clients.
// Start the clients, waiting cleaning-interval + 1 second between them which will show the
// dead list handling.
for (GossipMember member : startupMembers) {
- GossipService gossipService = new GossipService(cluster, myIpAddress, member.getPort(), "",
+ GossipService gossipService = new GossipService(cluster, member.getUri(), "",
startupMembers, settings, null);
clients.add(gossipService);
gossipService.start();
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 80cadf7..363a4a9 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -18,6 +18,8 @@
package org.apache.gossip.manager;
import java.lang.reflect.InvocationTargetException;
+import java.net.BindException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -68,18 +70,18 @@ public abstract class GossipManager extends Thread implements NotificationListen
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster,
- String address, int port, String id, GossipSettings settings,
+ URI uri, String id, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener) {
this.passiveGossipThreadClass = passiveGossipThreadClass;
this.activeGossipThreadClass = activeGossipThreadClass;
this.settings = settings;
- me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this,
+ me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this,
settings.getCleanupInterval());
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
- startupMember.getHost(), startupMember.getPort(), startupMember.getId(),
+ startupMember.getUri(), startupMember.getId(),
System.currentTimeMillis(), this, settings.getCleanupInterval());
members.put(member, GossipState.UP);
GossipService.LOGGER.debug(member);
@@ -180,6 +182,9 @@ public abstract class GossipManager extends Thread implements NotificationListen
gossipThreadExecutor.execute(activeGossipThread);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e1) {
+ if (e1 instanceof BindException){
+ LOGGER.fatal("could not bind to "+ me.getUri() + " " + me.getAddress());
+ }
throw new RuntimeException(e1);
}
GossipService.LOGGER.debug("The GossipService is started.");
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index bd7354e..a057e7d 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -23,6 +23,8 @@ import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -58,18 +60,18 @@ abstract public class PassiveGossipThread implements Runnable {
public PassiveGossipThread(GossipManager gossipManager) {
this.gossipManager = gossipManager;
try {
- SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getHost(),
- gossipManager.getMyself().getPort());
+ SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
+ gossipManager.getMyself().getUri().getPort());
server = new DatagramSocket(socketAddress);
- GossipService.LOGGER.debug("Gossip service successfully initialized on port "
- + gossipManager.getMyself().getPort());
- GossipService.LOGGER.debug("I am " + gossipManager.getMyself());
+ LOGGER.debug("Gossip service successfully initialized on port "
+ + gossipManager.getMyself().getUri().getPort());
+ LOGGER.debug("I am " + gossipManager.getMyself());
cluster = gossipManager.getMyself().getClusterName();
if (cluster == null){
throw new IllegalArgumentException("cluster was null");
}
} catch (SocketException ex) {
- GossipService.LOGGER.warn(ex);
+ LOGGER.warn(ex);
throw new RuntimeException(ex);
}
keepRunning = new AtomicBoolean(true);
@@ -103,14 +105,20 @@ abstract public class PassiveGossipThread implements Runnable {
ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes,
ActiveGossipMessage.class);
for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
+ URI u = null;
+ try {
+ u = new URI(activeGossipMessage.getMembers().get(i).getUri());
+ } catch (URISyntaxException e) {
+ LOGGER.debug("Gossip message with faulty URI", e);
+ continue;
+ }
RemoteGossipMember member = new RemoteGossipMember(
activeGossipMessage.getMembers().get(i).getCluster(),
- activeGossipMessage.getMembers().get(i).getHost(),
- activeGossipMessage.getMembers().get(i).getPort(),
+ u,
activeGossipMessage.getMembers().get(i).getId(),
activeGossipMessage.getMembers().get(i).getHeartbeat());
if (!(member.getClusterName().equals(cluster))){
- GossipService.LOGGER.warn("Note a member of this cluster " + i);
+ LOGGER.warn("Note a member of this cluster " + i);
continue;
}
// This is the first member found, so this should be the member who is communicating
@@ -122,16 +130,15 @@ abstract public class PassiveGossipThread implements Runnable {
}
mergeLists(gossipManager, senderMember, remoteGossipMembers);
} catch (RuntimeException ex) {
- GossipService.LOGGER.error("Unable to process message", ex);
+ LOGGER.error("Unable to process message", ex);
}
} else {
- GossipService.LOGGER
+ LOGGER
.error("The received message is not of the expected size, it has been dropped.");
}
} catch (IOException e) {
- GossipService.LOGGER.error(e);
- System.out.println(e);
+ LOGGER.error(e);
keepRunning.set(false);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
index edf21f3..d0acfc1 100644
--- a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@ -25,8 +25,11 @@ import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.PassiveGossipThread;
+import org.apache.log4j.Logger;
public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread {
+
+ public static final Logger LOGGER = Logger.getLogger(OnlyProcessReceivedPassiveGossipThread.class);
public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
super(gossipManager);
@@ -47,9 +50,9 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
// if the person sending to us is in the dead list consider them up
for (LocalGossipMember i : gossipManager.getDeadList()) {
if (i.getId().equals(senderMember.getId())) {
- System.out.println(gossipManager.getMyself() + " caught a live one!");
+ LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
- senderMember.getHost(), senderMember.getPort(), senderMember.getId(),
+ senderMember.getUri(), senderMember.getId(),
senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
@@ -70,7 +73,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
} else if (!gossipManager.getMemberList().contains(remoteMember)
&& !gossipManager.getDeadList().contains(remoteMember)) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
- remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
+ remoteMember.getUri(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.createOrRevivieMember(newLocalMember);
@@ -81,26 +84,26 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
gossipManager.getDeadList().indexOf(remoteMember));
if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
- remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
+ remoteMember.getUri(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
- GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress()
+ LOGGER.debug("Removed remote member " + remoteMember.getAddress()
+ " from dead list and added to local member list.");
} else {
- GossipService.LOGGER.debug("me " + gossipManager.getMyself());
- GossipService.LOGGER.debug("sender " + senderMember);
- GossipService.LOGGER.debug("remote " + remoteList);
- GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
- GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
+ LOGGER.debug("me " + gossipManager.getMyself());
+ LOGGER.debug("sender " + senderMember);
+ LOGGER.debug("remote " + remoteList);
+ LOGGER.debug("live " + gossipManager.getMemberList());
+ LOGGER.debug("dead " + gossipManager.getDeadList());
}
} else {
- GossipService.LOGGER.debug("me " + gossipManager.getMyself());
- GossipService.LOGGER.debug("sender " + senderMember);
- GossipService.LOGGER.debug("remote " + remoteList);
- GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
- GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
+ LOGGER.debug("me " + gossipManager.getMyself());
+ LOGGER.debug("sender " + senderMember);
+ LOGGER.debug("remote " + remoteList);
+ LOGGER.debug("live " + gossipManager.getMemberList());
+ LOGGER.debug("dead " + gossipManager.getDeadList());
// throw new IllegalArgumentException("wtf");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
index 16d0d32..c296156 100644
--- a/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
@@ -44,9 +44,8 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
GossipMember gm = new GossipMember();
gm.setCluster(member.getClusterName());
gm.setHeartbeat(member.getHeartbeat());
- gm.setHost(member.getHost());
+ gm.setUri(member.getUri().toASCIIString());
gm.setId(member.getId());
- gm.setPort(member.getPort());
return gm;
}
@@ -62,7 +61,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
- InetAddress dest = InetAddress.getByName(member.getHost());
+ InetAddress dest = InetAddress.getByName(member.getUri().getHost());
ActiveGossipMessage message = new ActiveGossipMessage();
message.getMembers().add(convert(me));
for (LocalGossipMember other : memberList) {
@@ -72,7 +71,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
byte[] buf = createBuffer(packet_length, json_bytes);
- DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort());
+ DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getUri().getPort());
socket.send(datagramPacket);
} else {
GossipService.LOGGER.error("The length of the to be send message is too large ("
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
index 0122610..7aa4435 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -23,12 +23,13 @@ import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
+import java.net.URI;
import java.util.List;
public class RandomGossipManager extends GossipManager {
- public RandomGossipManager(String cluster, String address, int port, String id,
+ public RandomGossipManager(String cluster, URI uri, String id,
GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster,
- address, port, id, settings, gossipMembers, listener);
+ uri, id, settings, gossipMembers, listener);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/main/java/org/apache/gossip/model/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/model/GossipMember.java b/src/main/java/org/apache/gossip/model/GossipMember.java
index 8dc6bf7..413ab71 100644
--- a/src/main/java/org/apache/gossip/model/GossipMember.java
+++ b/src/main/java/org/apache/gossip/model/GossipMember.java
@@ -3,8 +3,7 @@ package org.apache.gossip.model;
public class GossipMember {
private String cluster;
- private String host;
- private Integer port;
+ private String uri;
private String id;
private Long heartbeat;
@@ -12,12 +11,11 @@ public class GossipMember {
}
- public GossipMember(String cluster, String host, Integer port, String id, Long heartbeat){
- this.cluster=cluster;
- this.host= host;
- this.port = port;
+ public GossipMember(String cluster, String uri, String id, Long heartbeat){
+ this.cluster = cluster;
+ this.uri = uri;
this.id = id;
-
+ this.heartbeat = heartbeat;
}
public String getCluster() {
@@ -28,20 +26,12 @@ public class GossipMember {
this.cluster = cluster;
}
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public Integer getPort() {
- return port;
+ public String getUri() {
+ return uri;
}
- public void setPort(Integer port) {
- this.port = port;
+ public void setUri(String uri) {
+ this.uri = uri;
}
public String getId() {
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
index 2d8190b..340886a 100644
--- a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
@@ -19,6 +19,8 @@ package io.teknek.gossip;
import io.teknek.tunit.TUnit;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -43,7 +45,7 @@ public class ShutdownDeadtimeTest {
private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class );
@Test
//@Ignore
- public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException {
+ public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings(1000, 10000);
String cluster = UUID.randomUUID().toString();
@@ -51,7 +53,8 @@ public class ShutdownDeadtimeTest {
int seedNodes = 3;
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
- startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + ""));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
}
log.info( "Adding clients" );
@@ -59,7 +62,8 @@ public class ShutdownDeadtimeTest {
final int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
final int j = i;
- GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "",
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ GossipService gossipService = new GossipService(cluster, uri, i + "",
startupMembers, settings,
new GossipListener(){
@Override
@@ -83,7 +87,7 @@ public class ShutdownDeadtimeTest {
Random r = new Random();
int randomClientId = r.nextInt(clusterMembers);
log.info( "shutting down " + randomClientId );
- final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getPort();
+ final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri().getPort();
final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId();
clients.get(randomClientId).shutdown();
TUnit.assertThat(new Callable<Integer> (){
@@ -105,8 +109,9 @@ public class ShutdownDeadtimeTest {
return total;
}}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
// start client again
- GossipService gossipService = new GossipService(cluster, "127.0.0.1", shutdownPort, shutdownId + "",
+ GossipService gossipService = new GossipService(cluster, uri, shutdownId + "",
startupMembers, settings,
new GossipListener(){
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/test/java/io/teknek/gossip/StartupSettingsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/StartupSettingsTest.java b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
index aa4e404..a4a9011 100644
--- a/src/test/java/io/teknek/gossip/StartupSettingsTest.java
+++ b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
@@ -30,6 +30,8 @@ import io.teknek.tunit.TUnit;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -44,13 +46,14 @@ public class StartupSettingsTest {
private static final String CLUSTER = UUID.randomUUID().toString();
@Test
- public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException {
+ public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException, URISyntaxException {
File settingsFile = File.createTempFile("gossipTest",".json");
log.debug( "Using settings file: " + settingsFile.getAbsolutePath() );
settingsFile.deleteOnExit();
writeSettingsFile(settingsFile);
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000);
final GossipService firstService = new GossipService(
- CLUSTER, "127.0.0.1", 50000, UUID.randomUUID().toString(),
+ CLUSTER, uri, UUID.randomUUID().toString(),
new ArrayList<GossipMember>(), new GossipSettings(), null);
firstService.start();
@@ -76,11 +79,11 @@ public class StartupSettingsTest {
"[{\n" + // It is odd that this is meant to be in an array, but oh well.
" \"cluster\":\"" + CLUSTER + "\",\n" +
" \"id\":\"" + UUID.randomUUID() + "\",\n" +
- " \"port\":50001,\n" +
+ " \"uri\":\"udp://127.0.0.1:50001\",\n" +
" \"gossip_interval\":1000,\n" +
" \"cleanup_interval\":10000,\n" +
" \"members\":[\n" +
- " {\"cluster\": \"" + CLUSTER + "\",\"host\":\"127.0.0.1\", \"port\":50000}\n" +
+ " {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" +
" ]\n" +
"}]";
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5532585e/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
index 4e731ae..2b5f7fe 100644
--- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
@@ -19,6 +19,8 @@ package io.teknek.gossip;
import io.teknek.tunit.TUnit;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -41,16 +43,16 @@ public class TenNodeThreeSeedTest {
private static final Logger log = Logger.getLogger( TenNodeThreeSeedTest.class );
@Test
- public void test() throws UnknownHostException, InterruptedException{
+ public void test() throws UnknownHostException, InterruptedException, URISyntaxException{
abc();
}
@Test
- public void testAgain() throws UnknownHostException, InterruptedException{
+ public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException{
abc();
}
- public void abc() throws InterruptedException, UnknownHostException{
+ public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{
GossipSettings settings = new GossipSettings();
String cluster = UUID.randomUUID().toString();
@@ -58,14 +60,16 @@ public class TenNodeThreeSeedTest {
int seedNodes = 3;
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes+1; ++i) {
- startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + ""));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
}
log.info( "Adding clients" );
final List<GossipService> clients = new ArrayList<>();
final int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
- GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "",
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ GossipService gossipService = new GossipService(cluster, uri, i + "",
startupMembers, settings,
new GossipListener(){
@Override
@@ -75,7 +79,6 @@ public class TenNodeThreeSeedTest {
});
clients.add(gossipService);
gossipService.start();
- gossipService.get_gossipManager().getMemberList();
}
TUnit.assertThat(new Callable<Integer> (){
public Integer call() throws Exception {
@@ -84,7 +87,7 @@ public class TenNodeThreeSeedTest {
total += clients.get(i).get_gossipManager().getMemberList().size();
}
return total;
- }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(20);
+ }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown();
[08/12] incubator-gossip git commit: Move to URI in model and
configuration
Posted by ec...@apache.org.
Move to URI in model and configuration
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/ddc9a67d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/ddc9a67d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/ddc9a67d
Branch: refs/heads/master
Commit: ddc9a67dd60341c3f2326163810b2e4ede935f4e
Parents: 968203e
Author: Edward Capriolo <ed...@gmail.com>
Authored: Thu Jun 2 09:23:55 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Tue Jun 7 22:52:54 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/gossip/GossipMember.java | 35 ++++---------
.../java/org/apache/gossip/GossipRunner.java | 5 +-
.../java/org/apache/gossip/GossipService.java | 14 +++---
.../org/apache/gossip/LocalGossipMember.java | 13 +++--
.../org/apache/gossip/RemoteGossipMember.java | 19 +++----
.../java/org/apache/gossip/StartupSettings.java | 53 +++++++++-----------
.../apache/gossip/examples/GossipExample.java | 16 +++---
.../apache/gossip/manager/GossipManager.java | 11 ++--
.../gossip/manager/PassiveGossipThread.java | 33 +++++++-----
.../OnlyProcessReceivedPassiveGossipThread.java | 33 ++++++------
.../impl/SendMembersActiveGossipThread.java | 7 ++-
.../manager/random/RandomGossipManager.java | 5 +-
.../org/apache/gossip/model/GossipMember.java | 28 ++++-------
.../io/teknek/gossip/ShutdownDeadtimeTest.java | 15 ++++--
.../io/teknek/gossip/StartupSettingsTest.java | 11 ++--
.../io/teknek/gossip/TenNodeThreeSeedTest.java | 17 ++++---
16 files changed, 153 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/main/java/org/apache/gossip/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipMember.java b/src/main/java/org/apache/gossip/GossipMember.java
index fd44ddd..dbc84b2 100644
--- a/src/main/java/org/apache/gossip/GossipMember.java
+++ b/src/main/java/org/apache/gossip/GossipMember.java
@@ -18,6 +18,7 @@
package org.apache.gossip;
import java.net.InetSocketAddress;
+import java.net.URI;
/**
* A abstract class representing a gossip member.
@@ -27,9 +28,7 @@ import java.net.InetSocketAddress;
public abstract class GossipMember implements Comparable<GossipMember> {
- protected final String host;
-
- protected final int port;
+ protected final URI uri;
protected volatile long heartbeat;
@@ -54,12 +53,11 @@ public abstract class GossipMember implements Comparable<GossipMember> {
* @param id
* an id that may be replaced after contact
*/
- public GossipMember(String clusterName, String host, int port, String id, long heartbeat) {
+ public GossipMember(String clusterName, URI uri, String id, long heartbeat) {
this.clusterName = clusterName;
- this.host = host;
- this.port = port;
this.id = id;
this.heartbeat = heartbeat;
+ this.uri = uri;
}
/**
@@ -71,30 +69,13 @@ public abstract class GossipMember implements Comparable<GossipMember> {
return clusterName;
}
- /**
- * Get the hostname or IP address of the remote gossip member.
- *
- * @return The hostname or IP address.
- */
- public String getHost() {
- return host;
- }
-
- /**
- * Get the port number of the remote gossip member.
- *
- * @return The port number.
- */
- public int getPort() {
- return port;
- }
-
+
/**
* The member address in the form IP/host:port Similar to the toString in
* {@link InetSocketAddress}
*/
public String getAddress() {
- return host + ":" + port;
+ return uri.getHost() + ":" + uri.getPort();
}
/**
@@ -141,6 +122,10 @@ public abstract class GossipMember implements Comparable<GossipMember> {
return result;
}
+ public URI getUri() {
+ return uri;
+ }
+
/**
* @see java.lang.Object#equals(java.lang.Object)
*/
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/main/java/org/apache/gossip/GossipRunner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipRunner.java b/src/main/java/org/apache/gossip/GossipRunner.java
index d995cce..c765ed6 100644
--- a/src/main/java/org/apache/gossip/GossipRunner.java
+++ b/src/main/java/org/apache/gossip/GossipRunner.java
@@ -20,12 +20,13 @@ package org.apache.gossip;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.URISyntaxException;
import org.json.JSONException;
public class GossipRunner {
- public static void main(String[] args) {
+ public static void main(String[] args) throws URISyntaxException {
File configFile;
if (args.length == 1) {
configFile = new File("./" + args[0]);
@@ -35,7 +36,7 @@ public class GossipRunner {
new GossipRunner(configFile);
}
- public GossipRunner(File configFile) {
+ public GossipRunner(File configFile) throws URISyntaxException {
if (configFile != null && configFile.exists()) {
try {
System.out.println("Parsing the configuration file...");
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index 9db740e..3175706 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -18,6 +18,7 @@
package org.apache.gossip;
import java.net.InetAddress;
+import java.net.URI;
import java.net.UnknownHostException;
import java.util.List;
@@ -45,8 +46,8 @@ public class GossipService {
*/
public GossipService(StartupSettings startupSettings) throws InterruptedException,
UnknownHostException {
- this(startupSettings.getCluster(), InetAddress.getLocalHost().getHostAddress(), startupSettings
- .getPort(), startupSettings.getId(), startupSettings.getGossipMembers(),
+ this(startupSettings.getCluster(), startupSettings.getUri()
+ , startupSettings.getId(), startupSettings.getGossipMembers(),
startupSettings.getGossipSettings(), null);
}
@@ -56,18 +57,15 @@ public class GossipService {
* @throws InterruptedException
* @throws UnknownHostException
*/
- public GossipService(String cluster, String ipAddress, int port, String id,
+ public GossipService(String cluster, URI uri, String id,
List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
throws InterruptedException, UnknownHostException {
- gossipManager = new RandomGossipManager(cluster, ipAddress, port, id, settings, gossipMembers,
+ gossipManager = new RandomGossipManager(cluster, uri, id, settings, gossipMembers,
listener);
}
public void start() {
- String address = get_gossipManager().getMyself().getHost() + ":"
- + get_gossipManager().getMyself().getPort();
- LOGGER.debug("Starting: " + gossipManager.getName() + " - " + address);
-
+ LOGGER.debug("Starting: " + gossipManager.getName() + " - " + get_gossipManager().getMyself().getUri());
gossipManager.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/main/java/org/apache/gossip/LocalGossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java
index 55ce257..d7e9f4e 100644
--- a/src/main/java/org/apache/gossip/LocalGossipMember.java
+++ b/src/main/java/org/apache/gossip/LocalGossipMember.java
@@ -17,6 +17,8 @@
*/
package org.apache.gossip;
+import java.net.URI;
+
import javax.management.NotificationListener;
/**
@@ -32,10 +34,8 @@ public class LocalGossipMember extends GossipMember {
/**
* Constructor.
*
- * @param hostname
- * The hostname or IP address.
- * @param port
- * The port number.
+ * @param uri
+ * The uri of the member
* @param id
* @param heartbeat
* The current heartbeat.
@@ -43,10 +43,9 @@ public class LocalGossipMember extends GossipMember {
* @param cleanupTimeout
* The cleanup timeout for this gossip member.
*/
- public LocalGossipMember(String clusterName, String hostname, int port, String id,
+ public LocalGossipMember(String clusterName, URI uri, String id,
long heartbeat, NotificationListener notificationListener, int cleanupTimeout) {
- super(clusterName, hostname, port, id, heartbeat);
-
+ super(clusterName, uri, id, heartbeat);
timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/main/java/org/apache/gossip/RemoteGossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/RemoteGossipMember.java b/src/main/java/org/apache/gossip/RemoteGossipMember.java
index 899da93..88c568a 100644
--- a/src/main/java/org/apache/gossip/RemoteGossipMember.java
+++ b/src/main/java/org/apache/gossip/RemoteGossipMember.java
@@ -17,6 +17,8 @@
*/
package org.apache.gossip;
+import java.net.URI;
+
/**
* The object represents a gossip member with the properties as received from a remote gossip
* member.
@@ -35,19 +37,12 @@ public class RemoteGossipMember extends GossipMember {
* @param heartbeat
* The current heartbeat.
*/
- public RemoteGossipMember(String clusterName, String hostname, int port, String id, long heartbeat) {
- super(clusterName, hostname, port, id, heartbeat);
+ public RemoteGossipMember(String clusterName, URI uri, String id, long heartbeat) {
+ super(clusterName, uri, id, heartbeat);
}
- /**
- * Construct a RemoteGossipMember with a heartbeat of 0.
- *
- * @param hostname
- * The hostname or IP address.
- * @param port
- * The port number.
- */
- public RemoteGossipMember(String clusterName, String hostname, int port, String id) {
- super(clusterName, hostname, port, id, System.currentTimeMillis());
+ public RemoteGossipMember(String clusterName, URI uri, String id) {
+ super(clusterName, uri, id, System.currentTimeMillis());
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/main/java/org/apache/gossip/StartupSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java
index 176a79b..9475536 100644
--- a/src/main/java/org/apache/gossip/StartupSettings.java
+++ b/src/main/java/org/apache/gossip/StartupSettings.java
@@ -22,6 +22,8 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
@@ -41,9 +43,8 @@ public class StartupSettings {
/** The id to use fo the service */
private String id;
- /** The port to start the gossip service on. */
- private int port;
-
+ private URI uri;
+
private String cluster;
/** The gossip settings used at startup. */
@@ -62,8 +63,16 @@ public class StartupSettings {
* @param logLevel
* unused
*/
- public StartupSettings(String id, int port, int logLevel, String cluster) {
- this(id, port, new GossipSettings(), cluster);
+ public StartupSettings(String id, URI uri, int logLevel, String cluster) {
+ this(id, uri, new GossipSettings(), cluster);
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+ public void setUri(URI uri) {
+ this.uri = uri;
}
/**
@@ -74,9 +83,9 @@ public class StartupSettings {
* @param port
* The port to start the service on.
*/
- public StartupSettings(String id, int port, GossipSettings gossipSettings, String cluster) {
+ public StartupSettings(String id, URI uri, GossipSettings gossipSettings, String cluster) {
this.id = id;
- this.port = port;
+ this.uri = uri;
this.gossipSettings = gossipSettings;
this.setCluster(cluster);
gossipMembers = new ArrayList<>();
@@ -110,25 +119,6 @@ public class StartupSettings {
}
/**
- * Set the port of the gossip service.
- *
- * @param port
- * The port for the gossip service.
- */
- public void setPort(int port) {
- this.port = port;
- }
-
- /**
- * Get the port for the gossip service.
- *
- * @return The port of the gossip service.
- */
- public int getPort() {
- return port;
- }
-
- /**
* Get the GossipSettings.
*
* @return The GossipSettings object.
@@ -168,9 +158,10 @@ public class StartupSettings {
* Thrown when the file cannot be found.
* @throws IOException
* Thrown when reading the file gives problems.
+ * @throws URISyntaxException
*/
public static StartupSettings fromJSONFile(File jsonFile) throws JSONException,
- FileNotFoundException, IOException {
+ FileNotFoundException, IOException, URISyntaxException {
// Read the file to a String.
StringBuffer buffer = new StringBuffer();
try (BufferedReader br = new BufferedReader(new FileReader(jsonFile)) ){
@@ -181,7 +172,7 @@ public class StartupSettings {
}
JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0);
- int port = jsonObject.getInt("port");
+ String uri = jsonObject.getString("uri");
String id = jsonObject.getString("id");
int gossipInterval = jsonObject.getInt("gossip_interval");
int cleanupInterval = jsonObject.getInt("cleanup_interval");
@@ -189,7 +180,8 @@ public class StartupSettings {
if (cluster == null){
throw new IllegalArgumentException("cluster was null. It is required");
}
- StartupSettings settings = new StartupSettings(id, port, new GossipSettings(gossipInterval,
+ URI uri2 = new URI(uri);
+ StartupSettings settings = new StartupSettings(id, uri2, new GossipSettings(gossipInterval,
cleanupInterval), cluster);
// Now iterate over the members from the config file and add them to the settings.
@@ -197,8 +189,9 @@ public class StartupSettings {
JSONArray membersJSON = jsonObject.getJSONArray("members");
for (int i = 0; i < membersJSON.length(); i++) {
JSONObject memberJSON = membersJSON.getJSONObject(i);
+ URI uri3 = new URI(memberJSON.getString("uri"));
RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("cluster"),
- memberJSON.getString("host"), memberJSON.getInt("port"), "");
+ uri3, "", 0);
settings.addGossipMember(member);
configMembersDetails += member.getAddress();
if (i < (membersJSON.length() - 1))
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/main/java/org/apache/gossip/examples/GossipExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java
index e953c77..cea59f4 100644
--- a/src/main/java/org/apache/gossip/examples/GossipExample.java
+++ b/src/main/java/org/apache/gossip/examples/GossipExample.java
@@ -18,6 +18,8 @@
package org.apache.gossip.examples;
import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -57,26 +59,28 @@ public class GossipExample extends Thread {
public void run() {
try {
GossipSettings settings = new GossipSettings();
-
List<GossipService> clients = new ArrayList<>();
-
- // Get my ip address.
String myIpAddress = InetAddress.getLocalHost().getHostAddress();
-
String cluster = "My Gossip Cluster";
// Create the gossip members and put them in a list and give them a port number starting with
// 2000.
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
- startupMembers.add(new RemoteGossipMember(cluster, myIpAddress, 2000 + i, ""));
+ URI u;
+ try {
+ u = new URI("udp://" + myIpAddress + ":" + (2000 + i));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ startupMembers.add(new RemoteGossipMember(cluster, u, "", 0 ));
}
// Lets start the gossip clients.
// Start the clients, waiting cleaning-interval + 1 second between them which will show the
// dead list handling.
for (GossipMember member : startupMembers) {
- GossipService gossipService = new GossipService(cluster, myIpAddress, member.getPort(), "",
+ GossipService gossipService = new GossipService(cluster, member.getUri(), "",
startupMembers, settings, null);
clients.add(gossipService);
gossipService.start();
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 80cadf7..363a4a9 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -18,6 +18,8 @@
package org.apache.gossip.manager;
import java.lang.reflect.InvocationTargetException;
+import java.net.BindException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -68,18 +70,18 @@ public abstract class GossipManager extends Thread implements NotificationListen
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster,
- String address, int port, String id, GossipSettings settings,
+ URI uri, String id, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener) {
this.passiveGossipThreadClass = passiveGossipThreadClass;
this.activeGossipThreadClass = activeGossipThreadClass;
this.settings = settings;
- me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this,
+ me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this,
settings.getCleanupInterval());
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
- startupMember.getHost(), startupMember.getPort(), startupMember.getId(),
+ startupMember.getUri(), startupMember.getId(),
System.currentTimeMillis(), this, settings.getCleanupInterval());
members.put(member, GossipState.UP);
GossipService.LOGGER.debug(member);
@@ -180,6 +182,9 @@ public abstract class GossipManager extends Thread implements NotificationListen
gossipThreadExecutor.execute(activeGossipThread);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e1) {
+ if (e1 instanceof BindException){
+ LOGGER.fatal("could not bind to "+ me.getUri() + " " + me.getAddress());
+ }
throw new RuntimeException(e1);
}
GossipService.LOGGER.debug("The GossipService is started.");
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index bd7354e..a057e7d 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -23,6 +23,8 @@ import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -58,18 +60,18 @@ abstract public class PassiveGossipThread implements Runnable {
public PassiveGossipThread(GossipManager gossipManager) {
this.gossipManager = gossipManager;
try {
- SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getHost(),
- gossipManager.getMyself().getPort());
+ SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
+ gossipManager.getMyself().getUri().getPort());
server = new DatagramSocket(socketAddress);
- GossipService.LOGGER.debug("Gossip service successfully initialized on port "
- + gossipManager.getMyself().getPort());
- GossipService.LOGGER.debug("I am " + gossipManager.getMyself());
+ LOGGER.debug("Gossip service successfully initialized on port "
+ + gossipManager.getMyself().getUri().getPort());
+ LOGGER.debug("I am " + gossipManager.getMyself());
cluster = gossipManager.getMyself().getClusterName();
if (cluster == null){
throw new IllegalArgumentException("cluster was null");
}
} catch (SocketException ex) {
- GossipService.LOGGER.warn(ex);
+ LOGGER.warn(ex);
throw new RuntimeException(ex);
}
keepRunning = new AtomicBoolean(true);
@@ -103,14 +105,20 @@ abstract public class PassiveGossipThread implements Runnable {
ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes,
ActiveGossipMessage.class);
for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
+ URI u = null;
+ try {
+ u = new URI(activeGossipMessage.getMembers().get(i).getUri());
+ } catch (URISyntaxException e) {
+ LOGGER.debug("Gossip message with faulty URI", e);
+ continue;
+ }
RemoteGossipMember member = new RemoteGossipMember(
activeGossipMessage.getMembers().get(i).getCluster(),
- activeGossipMessage.getMembers().get(i).getHost(),
- activeGossipMessage.getMembers().get(i).getPort(),
+ u,
activeGossipMessage.getMembers().get(i).getId(),
activeGossipMessage.getMembers().get(i).getHeartbeat());
if (!(member.getClusterName().equals(cluster))){
- GossipService.LOGGER.warn("Note a member of this cluster " + i);
+ LOGGER.warn("Note a member of this cluster " + i);
continue;
}
// This is the first member found, so this should be the member who is communicating
@@ -122,16 +130,15 @@ abstract public class PassiveGossipThread implements Runnable {
}
mergeLists(gossipManager, senderMember, remoteGossipMembers);
} catch (RuntimeException ex) {
- GossipService.LOGGER.error("Unable to process message", ex);
+ LOGGER.error("Unable to process message", ex);
}
} else {
- GossipService.LOGGER
+ LOGGER
.error("The received message is not of the expected size, it has been dropped.");
}
} catch (IOException e) {
- GossipService.LOGGER.error(e);
- System.out.println(e);
+ LOGGER.error(e);
keepRunning.set(false);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
index edf21f3..d0acfc1 100644
--- a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@ -25,8 +25,11 @@ import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.PassiveGossipThread;
+import org.apache.log4j.Logger;
public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread {
+
+ public static final Logger LOGGER = Logger.getLogger(OnlyProcessReceivedPassiveGossipThread.class);
public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
super(gossipManager);
@@ -47,9 +50,9 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
// if the person sending to us is in the dead list consider them up
for (LocalGossipMember i : gossipManager.getDeadList()) {
if (i.getId().equals(senderMember.getId())) {
- System.out.println(gossipManager.getMyself() + " caught a live one!");
+ LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
- senderMember.getHost(), senderMember.getPort(), senderMember.getId(),
+ senderMember.getUri(), senderMember.getId(),
senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
@@ -70,7 +73,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
} else if (!gossipManager.getMemberList().contains(remoteMember)
&& !gossipManager.getDeadList().contains(remoteMember)) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
- remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
+ remoteMember.getUri(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.createOrRevivieMember(newLocalMember);
@@ -81,26 +84,26 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
gossipManager.getDeadList().indexOf(remoteMember));
if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
- remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
+ remoteMember.getUri(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
- GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress()
+ LOGGER.debug("Removed remote member " + remoteMember.getAddress()
+ " from dead list and added to local member list.");
} else {
- GossipService.LOGGER.debug("me " + gossipManager.getMyself());
- GossipService.LOGGER.debug("sender " + senderMember);
- GossipService.LOGGER.debug("remote " + remoteList);
- GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
- GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
+ LOGGER.debug("me " + gossipManager.getMyself());
+ LOGGER.debug("sender " + senderMember);
+ LOGGER.debug("remote " + remoteList);
+ LOGGER.debug("live " + gossipManager.getMemberList());
+ LOGGER.debug("dead " + gossipManager.getDeadList());
}
} else {
- GossipService.LOGGER.debug("me " + gossipManager.getMyself());
- GossipService.LOGGER.debug("sender " + senderMember);
- GossipService.LOGGER.debug("remote " + remoteList);
- GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
- GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
+ LOGGER.debug("me " + gossipManager.getMyself());
+ LOGGER.debug("sender " + senderMember);
+ LOGGER.debug("remote " + remoteList);
+ LOGGER.debug("live " + gossipManager.getMemberList());
+ LOGGER.debug("dead " + gossipManager.getDeadList());
// throw new IllegalArgumentException("wtf");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
index 16d0d32..c296156 100644
--- a/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
@@ -44,9 +44,8 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
GossipMember gm = new GossipMember();
gm.setCluster(member.getClusterName());
gm.setHeartbeat(member.getHeartbeat());
- gm.setHost(member.getHost());
+ gm.setUri(member.getUri().toASCIIString());
gm.setId(member.getId());
- gm.setPort(member.getPort());
return gm;
}
@@ -62,7 +61,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
- InetAddress dest = InetAddress.getByName(member.getHost());
+ InetAddress dest = InetAddress.getByName(member.getUri().getHost());
ActiveGossipMessage message = new ActiveGossipMessage();
message.getMembers().add(convert(me));
for (LocalGossipMember other : memberList) {
@@ -72,7 +71,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
byte[] buf = createBuffer(packet_length, json_bytes);
- DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort());
+ DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getUri().getPort());
socket.send(datagramPacket);
} else {
GossipService.LOGGER.error("The length of the to be send message is too large ("
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
index 0122610..7aa4435 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -23,12 +23,13 @@ import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
+import java.net.URI;
import java.util.List;
public class RandomGossipManager extends GossipManager {
- public RandomGossipManager(String cluster, String address, int port, String id,
+ public RandomGossipManager(String cluster, URI uri, String id,
GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster,
- address, port, id, settings, gossipMembers, listener);
+ uri, id, settings, gossipMembers, listener);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/main/java/org/apache/gossip/model/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/model/GossipMember.java b/src/main/java/org/apache/gossip/model/GossipMember.java
index 8dc6bf7..413ab71 100644
--- a/src/main/java/org/apache/gossip/model/GossipMember.java
+++ b/src/main/java/org/apache/gossip/model/GossipMember.java
@@ -3,8 +3,7 @@ package org.apache.gossip.model;
public class GossipMember {
private String cluster;
- private String host;
- private Integer port;
+ private String uri;
private String id;
private Long heartbeat;
@@ -12,12 +11,11 @@ public class GossipMember {
}
- public GossipMember(String cluster, String host, Integer port, String id, Long heartbeat){
- this.cluster=cluster;
- this.host= host;
- this.port = port;
+ public GossipMember(String cluster, String uri, String id, Long heartbeat){
+ this.cluster = cluster;
+ this.uri = uri;
this.id = id;
-
+ this.heartbeat = heartbeat;
}
public String getCluster() {
@@ -28,20 +26,12 @@ public class GossipMember {
this.cluster = cluster;
}
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public Integer getPort() {
- return port;
+ public String getUri() {
+ return uri;
}
- public void setPort(Integer port) {
- this.port = port;
+ public void setUri(String uri) {
+ this.uri = uri;
}
public String getId() {
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
index 2d8190b..340886a 100644
--- a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
@@ -19,6 +19,8 @@ package io.teknek.gossip;
import io.teknek.tunit.TUnit;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -43,7 +45,7 @@ public class ShutdownDeadtimeTest {
private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class );
@Test
//@Ignore
- public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException {
+ public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings(1000, 10000);
String cluster = UUID.randomUUID().toString();
@@ -51,7 +53,8 @@ public class ShutdownDeadtimeTest {
int seedNodes = 3;
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
- startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + ""));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
}
log.info( "Adding clients" );
@@ -59,7 +62,8 @@ public class ShutdownDeadtimeTest {
final int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
final int j = i;
- GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "",
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ GossipService gossipService = new GossipService(cluster, uri, i + "",
startupMembers, settings,
new GossipListener(){
@Override
@@ -83,7 +87,7 @@ public class ShutdownDeadtimeTest {
Random r = new Random();
int randomClientId = r.nextInt(clusterMembers);
log.info( "shutting down " + randomClientId );
- final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getPort();
+ final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri().getPort();
final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId();
clients.get(randomClientId).shutdown();
TUnit.assertThat(new Callable<Integer> (){
@@ -105,8 +109,9 @@ public class ShutdownDeadtimeTest {
return total;
}}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
// start client again
- GossipService gossipService = new GossipService(cluster, "127.0.0.1", shutdownPort, shutdownId + "",
+ GossipService gossipService = new GossipService(cluster, uri, shutdownId + "",
startupMembers, settings,
new GossipListener(){
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/test/java/io/teknek/gossip/StartupSettingsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/StartupSettingsTest.java b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
index aa4e404..a4a9011 100644
--- a/src/test/java/io/teknek/gossip/StartupSettingsTest.java
+++ b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
@@ -30,6 +30,8 @@ import io.teknek.tunit.TUnit;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -44,13 +46,14 @@ public class StartupSettingsTest {
private static final String CLUSTER = UUID.randomUUID().toString();
@Test
- public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException {
+ public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException, URISyntaxException {
File settingsFile = File.createTempFile("gossipTest",".json");
log.debug( "Using settings file: " + settingsFile.getAbsolutePath() );
settingsFile.deleteOnExit();
writeSettingsFile(settingsFile);
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000);
final GossipService firstService = new GossipService(
- CLUSTER, "127.0.0.1", 50000, UUID.randomUUID().toString(),
+ CLUSTER, uri, UUID.randomUUID().toString(),
new ArrayList<GossipMember>(), new GossipSettings(), null);
firstService.start();
@@ -76,11 +79,11 @@ public class StartupSettingsTest {
"[{\n" + // It is odd that this is meant to be in an array, but oh well.
" \"cluster\":\"" + CLUSTER + "\",\n" +
" \"id\":\"" + UUID.randomUUID() + "\",\n" +
- " \"port\":50001,\n" +
+ " \"uri\":\"udp://127.0.0.1:50001\",\n" +
" \"gossip_interval\":1000,\n" +
" \"cleanup_interval\":10000,\n" +
" \"members\":[\n" +
- " {\"cluster\": \"" + CLUSTER + "\",\"host\":\"127.0.0.1\", \"port\":50000}\n" +
+ " {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" +
" ]\n" +
"}]";
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ddc9a67d/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
index 4e731ae..2b5f7fe 100644
--- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
@@ -19,6 +19,8 @@ package io.teknek.gossip;
import io.teknek.tunit.TUnit;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -41,16 +43,16 @@ public class TenNodeThreeSeedTest {
private static final Logger log = Logger.getLogger( TenNodeThreeSeedTest.class );
@Test
- public void test() throws UnknownHostException, InterruptedException{
+ public void test() throws UnknownHostException, InterruptedException, URISyntaxException{
abc();
}
@Test
- public void testAgain() throws UnknownHostException, InterruptedException{
+ public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException{
abc();
}
- public void abc() throws InterruptedException, UnknownHostException{
+ public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{
GossipSettings settings = new GossipSettings();
String cluster = UUID.randomUUID().toString();
@@ -58,14 +60,16 @@ public class TenNodeThreeSeedTest {
int seedNodes = 3;
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes+1; ++i) {
- startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + ""));
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
}
log.info( "Adding clients" );
final List<GossipService> clients = new ArrayList<>();
final int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
- GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "",
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ GossipService gossipService = new GossipService(cluster, uri, i + "",
startupMembers, settings,
new GossipListener(){
@Override
@@ -75,7 +79,6 @@ public class TenNodeThreeSeedTest {
});
clients.add(gossipService);
gossipService.start();
- gossipService.get_gossipManager().getMemberList();
}
TUnit.assertThat(new Callable<Integer> (){
public Integer call() throws Exception {
@@ -84,7 +87,7 @@ public class TenNodeThreeSeedTest {
total += clients.get(i).get_gossipManager().getMemberList().size();
}
return total;
- }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(20);
+ }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown();
[06/12] incubator-gossip git commit: Too much logging
Posted by ec...@apache.org.
Too much logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/266e039b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/266e039b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/266e039b
Branch: refs/heads/master
Commit: 266e039b29a23989c918a8b7113d6e9c7fcca8da
Parents: 29f5966
Author: Edward Capriolo <ed...@gmail.com>
Authored: Mon May 16 19:03:34 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Tue Jun 7 22:52:53 2016 -0400
----------------------------------------------------------------------
.../com/google/code/gossip/manager/PassiveGossipThread.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/266e039b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
index 5abb39f..ec11cfe 100644
--- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
@@ -96,9 +96,11 @@ abstract public class PassiveGossipThread implements Runnable {
for (int i = 0; i < packet_length; i++) {
json_bytes[i] = buf[i + 4];
}
- String receivedMessage = new String(json_bytes);
- GossipService.LOGGER.warn("Received message (" + packet_length + " bytes): "
+ if (GossipService.LOGGER.isDebugEnabled()){
+ String receivedMessage = new String(json_bytes);
+ GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): "
+ receivedMessage);
+ }
try {
List<GossipMember> remoteGossipMembers = new ArrayList<>();
RemoteGossipMember senderMember = null;
[12/12] incubator-gossip git commit: GOSSIP-2 merge
Posted by ec...@apache.org.
GOSSIP-2 merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/2c1dc437
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/2c1dc437
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/2c1dc437
Branch: refs/heads/master
Commit: 2c1dc4375bb1123334468488db72062546a9a9f2
Parents: 496f299 fe196cd
Author: Edward Capriolo <ed...@gmail.com>
Authored: Tue Jun 7 23:10:14 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Tue Jun 7 23:10:14 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/gossip/GossipService.java | 3 +-
.../manager/random/RandomGossipManager.java | 72 +++++++++++++++++++-
.../manager/RandomGossipManagerBuilderTest.java | 16 ++---
3 files changed, 79 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/2c1dc437/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------
diff --cc src/main/java/org/apache/gossip/GossipService.java
index 866dc03,cf727a8..b149719
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@@ -63,8 -62,8 +63,7 @@@ public class GossipService
gossipManager = RandomGossipManager.newBuilder()
.withId(id)
.cluster(cluster)
-- .address(ipAddress)
-- .port(port)
++ .uri(uri)
.settings(settings)
.gossipMembers(gossipMembers)
.listener(listener)
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/2c1dc437/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
----------------------------------------------------------------------
diff --cc src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
index 7aa4435,d407d2a..1d2075e
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@@ -23,13 -23,83 +23,81 @@@ import org.apache.gossip.event.GossipLi
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
+import java.net.URI;
+import java.util.List;
+
+ import java.util.ArrayList;
+ import java.util.List;
+
public class RandomGossipManager extends GossipManager {
- public RandomGossipManager(String cluster, URI uri, String id,
- GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) {
+
+ public static ManagerBuilder newBuilder() {
+ return new ManagerBuilder();
+ }
+
+ public static final class ManagerBuilder {
+ private String cluster;
- private String address;
- private int port;
++ private URI uri;
+ private String id;
+ private GossipSettings settings;
+ private List<GossipMember> gossipMembers;
+ private GossipListener listener;
+
+ private ManagerBuilder() {}
+
+ private void checkArgument(boolean check, String msg) {
+ if (!check) {
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
+ public ManagerBuilder cluster(String cluster) {
+ this.cluster = cluster;
+ return this;
+ }
+
- public ManagerBuilder address(String address) {
- this.address = address;
- return this;
- }
-
- public ManagerBuilder port(int port) {
- this.port = port;
- return this;
- }
-
+ public ManagerBuilder withId(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public ManagerBuilder settings(GossipSettings settings) {
+ this.settings = settings;
+ return this;
+ }
+
+ public ManagerBuilder gossipMembers(List<GossipMember> members) {
+ this.gossipMembers = members;
+ return this;
+ }
+
+ public ManagerBuilder listener(GossipListener listener) {
+ this.listener = listener;
+ return this;
+ }
+
++ public ManagerBuilder uri(URI uri){
++ this.uri = uri;
++ return this;
++ }
++
+ public RandomGossipManager build() {
+ checkArgument(id != null, "You must specify an id");
+ checkArgument(cluster != null, "You must specify a cluster name");
+ checkArgument(settings != null, "You must specify gossip settings");
++ checkArgument(uri != null, "You must specify a uri");
+
+ if (this.gossipMembers == null) {
+ this.gossipMembers = new ArrayList<>();
+ }
+
- return new RandomGossipManager(cluster, address, port, id, settings, gossipMembers, listener);
++ return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener);
+ }
+ }
+
- private RandomGossipManager(String cluster, String address, int port, String id,
- GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) {
++ private RandomGossipManager(String cluster, URI uri, String id, GossipSettings settings,
++ List<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster,
- address, port, id, settings, gossipMembers, listener);
+ uri, id, settings, gossipMembers, listener);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/2c1dc437/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
----------------------------------------------------------------------
diff --cc src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
index 38b8ab4,38b8ab4..d753676
--- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
+++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
@@@ -28,6 -28,6 +28,9 @@@ import org.junit.Test
import javax.management.Notification;
import javax.management.NotificationListener;
++
++import java.net.URI;
++import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
@@@ -63,12 -63,12 +66,11 @@@ public class RandomGossipManagerBuilder
}
@Test
-- public void createMembersListIfNull() {
++ public void createMembersListIfNull() throws URISyntaxException {
RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
.withId("id")
.cluster("aCluster")
-- .port(8080)
-- .address("localhost")
++ .uri(new URI("udp://localhost:2000"))
.settings(new GossipSettings())
.gossipMembers(null).build();
@@@ -76,19 -76,19 +78,17 @@@
}
@Test
-- public void useMemberListIfProvided() {
-- LocalGossipMember member = new LocalGossipMember("aCluster", "localhost", 2000, "aGossipMember",
++ public void useMemberListIfProvided() throws URISyntaxException {
++ LocalGossipMember member = new LocalGossipMember("aCluster", new URI("udp://localhost:2000"), "aGossipMember",
System.currentTimeMillis(), new TestNotificationListener(), 60000);
--
List<GossipMember> memberList = new ArrayList<>();
memberList.add(member);
--
RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
.withId("id")
.cluster("aCluster")
.settings(new GossipSettings())
++ .uri(new URI("udp://localhost:8000"))
.gossipMembers(memberList).build();
--
Assert.assertEquals(1, gossipManager.getMemberList().size());
Assert.assertEquals(member.getId(), gossipManager.getMemberList().get(0).getId());
}
[11/12] incubator-gossip git commit: Rebase
Posted by ec...@apache.org.
Rebase
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/496f299d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/496f299d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/496f299d
Branch: refs/heads/master
Commit: 496f299d018a9b9bb2902d60156316e4c8e47142
Parents: 900bfda 5532585
Author: Edward Capriolo <ed...@gmail.com>
Authored: Tue Jun 7 22:55:48 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Tue Jun 7 22:55:48 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/gossip/GossipService.java | 5 --
.../manager/random/RandomGossipManager.java | 78 --------------------
2 files changed, 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/496f299d/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------
diff --cc src/main/java/org/apache/gossip/GossipService.java
index d07c070,3175706..866dc03
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@@ -60,20 -60,8 +60,15 @@@ public class GossipService
public GossipService(String cluster, URI uri, String id,
List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
throws InterruptedException, UnknownHostException {
- <<<<<<< HEAD
-- gossipManager = new RandomGossipManager(cluster, uri, id, settings, gossipMembers,
-- listener);
- =======
+ gossipManager = RandomGossipManager.newBuilder()
+ .withId(id)
+ .cluster(cluster)
+ .address(ipAddress)
+ .port(port)
+ .settings(settings)
+ .gossipMembers(gossipMembers)
+ .listener(listener)
+ .build();
- >>>>>>> fe196cd... GOSSIP-4: Use builder to create RandomGossipManager (Jaideep Dhok via EGC)
}
public void start() {
[04/12] incubator-gossip git commit: clean up warnings
Posted by ec...@apache.org.
clean up warnings
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/6a35db7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/6a35db7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/6a35db7c
Branch: refs/heads/master
Commit: 6a35db7cb901339a3badbf3374721a8a70c7868c
Parents: 8e8db1c
Author: Edward Capriolo <ed...@gmail.com>
Authored: Mon May 16 21:33:45 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Tue Jun 7 22:52:53 2016 -0400
----------------------------------------------------------------------
src/main/java/com/google/code/gossip/GossipMember.java | 3 ---
src/main/java/com/google/code/gossip/event/GossipState.java | 1 +
src/test/java/io/teknek/gossip/StartupSettingsTest.java | 3 ---
src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java | 1 -
4 files changed, 1 insertion(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/6a35db7c/src/main/java/com/google/code/gossip/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java
index 314b5b7..56029fa 100644
--- a/src/main/java/com/google/code/gossip/GossipMember.java
+++ b/src/main/java/com/google/code/gossip/GossipMember.java
@@ -19,9 +19,6 @@ package com.google.code.gossip;
import java.net.InetSocketAddress;
-import org.json.JSONException;
-import org.json.JSONObject;
-
/**
* A abstract class representing a gossip member.
*
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/6a35db7c/src/main/java/com/google/code/gossip/event/GossipState.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/event/GossipState.java b/src/main/java/com/google/code/gossip/event/GossipState.java
index c0bc565..e303c89 100644
--- a/src/main/java/com/google/code/gossip/event/GossipState.java
+++ b/src/main/java/com/google/code/gossip/event/GossipState.java
@@ -19,6 +19,7 @@ package com.google.code.gossip.event;
public enum GossipState {
UP("up"), DOWN("down");
+ @SuppressWarnings("unused")
private final String state;
private GossipState(String state) {
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/6a35db7c/src/test/java/io/teknek/gossip/StartupSettingsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/StartupSettingsTest.java b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
index 5f25dd9..bf6710e 100644
--- a/src/test/java/io/teknek/gossip/StartupSettingsTest.java
+++ b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
@@ -35,9 +35,6 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
/**
* Tests support of using {@code StartupSettings} and thereby reading
* setup config from file.
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/6a35db7c/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
index 0065ade..277d0fe 100644
--- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
-import org.junit.Assert;
import org.junit.Test;
import com.google.code.gossip.GossipMember;
[03/12] incubator-gossip git commit: use jackson
Posted by ec...@apache.org.
use jackson
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/29f59664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/29f59664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/29f59664
Branch: refs/heads/master
Commit: 29f5966463e372c00ad7f2e26b2e4ff69d6b8b0e
Parents: 176e173
Author: Edward Capriolo <ed...@gmail.com>
Authored: Mon May 16 18:54:17 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Tue Jun 7 22:52:53 2016 -0400
----------------------------------------------------------------------
.../gossip/manager/PassiveGossipThread.java | 57 ++++++++----------
.../impl/SendMembersActiveGossipThread.java | 25 ++++++--
.../code/gossip/model/ActiveGossipMessage.java | 22 +++++++
.../google/code/gossip/model/GossipMember.java | 63 ++++++++++++++++++++
4 files changed, 129 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/29f59664/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
index b05a780..5abb39f 100644
--- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@@ -35,6 +36,7 @@ import org.json.JSONObject;
import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipService;
import com.google.code.gossip.RemoteGossipMember;
+import com.google.code.gossip.model.ActiveGossipMessage;
/**
* [The passive thread: reply to incoming gossip request.] This class handles the passive cycle,
@@ -54,6 +56,8 @@ abstract public class PassiveGossipThread implements Runnable {
private AtomicBoolean keepRunning;
private final String cluster;
+
+ private ObjectMapper MAPPER = new ObjectMapper();
public PassiveGossipThread(GossipManager gossipManager) {
this.gossipManager = gossipManager;
@@ -93,46 +97,35 @@ abstract public class PassiveGossipThread implements Runnable {
json_bytes[i] = buf[i + 4];
}
String receivedMessage = new String(json_bytes);
- GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): "
+ GossipService.LOGGER.warn("Received message (" + packet_length + " bytes): "
+ receivedMessage);
try {
List<GossipMember> remoteGossipMembers = new ArrayList<>();
RemoteGossipMember senderMember = null;
- JSONArray jsonArray = new JSONArray(receivedMessage);
- for (int i = 0; i < jsonArray.length(); i++) {
- JSONObject memberJSONObject = jsonArray.getJSONObject(i);
- if (memberJSONObject.length() == 5
- && cluster.equals(memberJSONObject.get(GossipMember.JSON_CLUSTER))) {
- RemoteGossipMember member = new RemoteGossipMember(
- memberJSONObject.getString(GossipMember.JSON_CLUSTER),
- memberJSONObject.getString(GossipMember.JSON_HOST),
- memberJSONObject.getInt(GossipMember.JSON_PORT),
- memberJSONObject.getString(GossipMember.JSON_ID),
- memberJSONObject.getLong(GossipMember.JSON_HEARTBEAT));
- GossipService.LOGGER.debug(member.toString());
- // This is the first member found, so this should be the member who is communicating
- // with me.
- if (i == 0) {
- senderMember = member;
- }
- remoteGossipMembers.add(member);
- } else if (memberJSONObject.length() == 5) {
- GossipService.LOGGER.warn("The member object does not belong to this cluster.");
- } else {
- GossipService.LOGGER
- .error("The received member object does not contain 5 objects:\n"
- + memberJSONObject.toString());
+ ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes,
+ ActiveGossipMessage.class);
+ for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
+ RemoteGossipMember member = new RemoteGossipMember(
+ activeGossipMessage.getMembers().get(i).getCluster(),
+ activeGossipMessage.getMembers().get(i).getHost(),
+ activeGossipMessage.getMembers().get(i).getPort(),
+ activeGossipMessage.getMembers().get(i).getId(),
+ activeGossipMessage.getMembers().get(i).getHeartbeat());
+ if (!(member.getClusterName().equals(cluster))){
+ GossipService.LOGGER.warn("Note a member of this cluster " + i);
+ continue;
}
-
+ // This is the first member found, so this should be the member who is communicating
+ // with me.
+ if (i == 0) {
+ senderMember = member;
+ }
+ remoteGossipMembers.add(member);
}
mergeLists(gossipManager, senderMember, remoteGossipMembers);
- } catch (JSONException e) {
- GossipService.LOGGER
- .error("The received message is not well-formed JSON. The following message has been dropped:\n"
- + receivedMessage);
- System.out.println(e);
+ } catch (RuntimeException ex) {
+ GossipService.LOGGER.error("Unable to process message", ex);
}
-
} else {
GossipService.LOGGER
.error("The received message is not of the expected size, it has been dropped.");
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/29f59664/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
index 4e5f855..2259781 100644
--- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
@@ -24,19 +24,33 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
-import org.json.JSONArray;
+import org.codehaus.jackson.map.ObjectMapper;
import com.google.code.gossip.GossipService;
import com.google.code.gossip.LocalGossipMember;
import com.google.code.gossip.manager.ActiveGossipThread;
import com.google.code.gossip.manager.GossipManager;
+import com.google.code.gossip.model.ActiveGossipMessage;
+import com.google.code.gossip.model.GossipMember;
abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
+ protected ObjectMapper om = new ObjectMapper();
+
public SendMembersActiveGossipThread(GossipManager gossipManager) {
super(gossipManager);
}
+ private GossipMember convert(LocalGossipMember member){
+ GossipMember gm = new GossipMember();
+ gm.setCluster(member.getClusterName());
+ gm.setHeartbeat(member.getHeartbeat());
+ gm.setHost(member.getHost());
+ gm.setId(member.getId());
+ gm.setPort(member.getPort());
+ return gm;
+ }
+
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
@@ -50,13 +64,12 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
InetAddress dest = InetAddress.getByName(member.getHost());
- JSONArray jsonArray = new JSONArray();
- jsonArray.put(me.toJSONObject());
+ ActiveGossipMessage message = new ActiveGossipMessage();
+ message.getMembers().add(convert(me));
for (LocalGossipMember other : memberList) {
- jsonArray.put(other.toJSONObject());
- GossipService.LOGGER.debug(other);
+ message.getMembers().add(convert(other));
}
- byte[] json_bytes = jsonArray.toString().getBytes();
+ byte[] json_bytes = om.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
byte[] buf = createBuffer(packet_length, json_bytes);
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/29f59664/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java b/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java
new file mode 100644
index 0000000..d3516f5
--- /dev/null
+++ b/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java
@@ -0,0 +1,22 @@
+package com.google.code.gossip.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActiveGossipMessage {
+
+ private List<GossipMember> members = new ArrayList<>();
+
+ public ActiveGossipMessage(){
+
+ }
+
+ public List<GossipMember> getMembers() {
+ return members;
+ }
+
+ public void setMembers(List<GossipMember> members) {
+ this.members = members;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/29f59664/src/main/java/com/google/code/gossip/model/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/model/GossipMember.java b/src/main/java/com/google/code/gossip/model/GossipMember.java
new file mode 100644
index 0000000..6c073b4
--- /dev/null
+++ b/src/main/java/com/google/code/gossip/model/GossipMember.java
@@ -0,0 +1,63 @@
+package com.google.code.gossip.model;
+
+public class GossipMember {
+
+ private String cluster;
+ private String host;
+ private Integer port;
+ private String id;
+ private Long heartbeat;
+
+ public GossipMember(){
+
+ }
+
+ public GossipMember(String cluster, String host, Integer port, String id, Long heartbeat){
+ this.cluster=cluster;
+ this.host= host;
+ this.port = port;
+ this.id = id;
+
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(String cluster) {
+ this.cluster = cluster;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public Integer getPort() {
+ return port;
+ }
+
+ public void setPort(Integer port) {
+ this.port = port;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Long getHeartbeat() {
+ return heartbeat;
+ }
+
+ public void setHeartbeat(Long heartbeat) {
+ this.heartbeat = heartbeat;
+ }
+
+}
[09/12] incubator-gossip git commit: renamed packages from 'google'
to 'apache' and updated necessary imports
Posted by ec...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/GossipSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java
new file mode 100644
index 0000000..99b5807
--- /dev/null
+++ b/src/main/java/org/apache/gossip/GossipSettings.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+/**
+ * In this object the settings used by the GossipService are held.
+ *
+ * @author harmenw
+ */
+public class GossipSettings {
+
+ /** Time between gossip'ing in ms. Default is 1 second. */
+ private int gossipInterval = 1000;
+
+ /** Time between cleanups in ms. Default is 10 seconds. */
+ private int cleanupInterval = 10000;
+
+ /**
+ * Construct GossipSettings with default settings.
+ */
+ public GossipSettings() {
+ }
+
+ /**
+ * Construct GossipSettings with given settings.
+ *
+ * @param gossipInterval
+ * The gossip interval in ms.
+ * @param cleanupInterval
+ * The cleanup interval in ms.
+ */
+ public GossipSettings(int gossipInterval, int cleanupInterval) {
+ this.gossipInterval = gossipInterval;
+ this.cleanupInterval = cleanupInterval;
+ }
+
+ /**
+ * Set the gossip interval. This is the time between a gossip message is send.
+ *
+ * @param gossipInterval
+ * The gossip interval in ms.
+ */
+ public void setGossipTimeout(int gossipInterval) {
+ this.gossipInterval = gossipInterval;
+ }
+
+ /**
+ * Set the cleanup interval. This is the time between the last heartbeat received from a member
+ * and when it will be marked as dead.
+ *
+ * @param cleanupInterval
+ * The cleanup interval in ms.
+ */
+ public void setCleanupInterval(int cleanupInterval) {
+ this.cleanupInterval = cleanupInterval;
+ }
+
+ /**
+ * Get the gossip interval.
+ *
+ * @return The gossip interval in ms.
+ */
+ public int getGossipInterval() {
+ return gossipInterval;
+ }
+
+ /**
+ * Get the clean interval.
+ *
+ * @return The cleanup interval.
+ */
+ public int getCleanupInterval() {
+ return cleanupInterval;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/GossipTimeoutTimer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipTimeoutTimer.java b/src/main/java/org/apache/gossip/GossipTimeoutTimer.java
new file mode 100644
index 0000000..2fa09c0
--- /dev/null
+++ b/src/main/java/org/apache/gossip/GossipTimeoutTimer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.util.Date;
+
+import javax.management.NotificationListener;
+import javax.management.timer.Timer;
+
+/**
+ * This object represents a timer for a gossip member. When the timer has elapsed without being
+ * reset in the meantime, it will inform the GossipService about this who in turn will put the
+ * gossip member on the dead list, because it is apparantly not alive anymore.
+ *
+ * @author joshclemm, harmenw
+ */
+public class GossipTimeoutTimer extends Timer {
+
+ private final long sleepTime;
+
+ private final LocalGossipMember source;
+
+ /**
+ * Constructor. Creates a reset-able timer that wakes up after millisecondsSleepTime.
+ *
+ * @param millisecondsSleepTime
+ * The time for this timer to wait before an event.
+ * @param notificationListener
+ * @param member
+ */
+ public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener,
+ LocalGossipMember member) {
+ super();
+ sleepTime = millisecondsSleepTime;
+ source = member;
+ addNotificationListener(notificationListener, null, null);
+ }
+
+ /**
+ * @see javax.management.timer.Timer#start()
+ */
+ public void start() {
+ this.reset();
+ super.start();
+ }
+
+ /**
+ * Resets timer to start counting down from original time.
+ */
+ public void reset() {
+ removeAllNotifications();
+ setWakeupTime(sleepTime);
+ }
+
+ /**
+ * Adds a new wake-up time for this timer.
+ *
+ * @param milliseconds
+ */
+ private void setWakeupTime(long milliseconds) {
+ addNotification("type", "message", source, new Date(System.currentTimeMillis() + milliseconds));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/LocalGossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java
new file mode 100644
index 0000000..55ce257
--- /dev/null
+++ b/src/main/java/org/apache/gossip/LocalGossipMember.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import javax.management.NotificationListener;
+
+/**
+ * This object represent a gossip member with the properties known locally. These objects are stored
+ * in the local list of gossip member.s
+ *
+ * @author harmenw
+ */
+public class LocalGossipMember extends GossipMember {
+ /** The timeout timer for this gossip member. */
+ private final transient GossipTimeoutTimer timeoutTimer;
+
+ /**
+ * Constructor.
+ *
+ * @param hostname
+ * The hostname or IP address.
+ * @param port
+ * The port number.
+ * @param id
+ * @param heartbeat
+ * The current heartbeat.
+ * @param notificationListener
+ * @param cleanupTimeout
+ * The cleanup timeout for this gossip member.
+ */
+ public LocalGossipMember(String clusterName, String hostname, int port, String id,
+ long heartbeat, NotificationListener notificationListener, int cleanupTimeout) {
+ super(clusterName, hostname, port, id, heartbeat);
+
+ timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
+ }
+
+ /**
+ * Start the timeout timer.
+ */
+ public void startTimeoutTimer() {
+ timeoutTimer.start();
+ }
+
+ /**
+ * Reset the timeout timer.
+ */
+ public void resetTimeoutTimer() {
+ timeoutTimer.reset();
+ }
+
+ public void disableTimer() {
+ timeoutTimer.removeAllNotifications();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/RemoteGossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/RemoteGossipMember.java b/src/main/java/org/apache/gossip/RemoteGossipMember.java
new file mode 100644
index 0000000..899da93
--- /dev/null
+++ b/src/main/java/org/apache/gossip/RemoteGossipMember.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+/**
+ * The object represents a gossip member with the properties as received from a remote gossip
+ * member.
+ *
+ * @author harmenw
+ */
+public class RemoteGossipMember extends GossipMember {
+
+ /**
+ * Constructor.
+ *
+ * @param hostname
+ * The hostname or IP address.
+ * @param port
+ * The port number.
+ * @param heartbeat
+ * The current heartbeat.
+ */
+ public RemoteGossipMember(String clusterName, String hostname, int port, String id, long heartbeat) {
+ super(clusterName, hostname, port, id, heartbeat);
+ }
+
+ /**
+ * Construct a RemoteGossipMember with a heartbeat of 0.
+ *
+ * @param hostname
+ * The hostname or IP address.
+ * @param port
+ * The port number.
+ */
+ public RemoteGossipMember(String clusterName, String hostname, int port, String id) {
+ super(clusterName, hostname, port, id, System.currentTimeMillis());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/StartupSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java
new file mode 100644
index 0000000..176a79b
--- /dev/null
+++ b/src/main/java/org/apache/gossip/StartupSettings.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * This object represents the settings used when starting the gossip service.
+ *
+ * @author harmenw
+ */
+public class StartupSettings {
+ private static final Logger log = Logger.getLogger(StartupSettings.class);
+
+ /** The id to use fo the service */
+ private String id;
+
+ /** The port to start the gossip service on. */
+ private int port;
+
+ private String cluster;
+
+ /** The gossip settings used at startup. */
+ private final GossipSettings gossipSettings;
+
+ /** The list with gossip members to start with. */
+ private final List<GossipMember> gossipMembers;
+
+ /**
+ * Constructor.
+ *
+ * @param id
+ * The id to be used for this service
+ * @param port
+ * The port to start the service on.
+ * @param logLevel
+ * unused
+ */
+ public StartupSettings(String id, int port, int logLevel, String cluster) {
+ this(id, port, new GossipSettings(), cluster);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param id
+ * The id to be used for this service
+ * @param port
+ * The port to start the service on.
+ */
+ public StartupSettings(String id, int port, GossipSettings gossipSettings, String cluster) {
+ this.id = id;
+ this.port = port;
+ this.gossipSettings = gossipSettings;
+ this.setCluster(cluster);
+ gossipMembers = new ArrayList<>();
+ }
+
+ public void setCluster(String cluster) {
+ this.cluster = cluster;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ /**
+ * Set the id to be used for this service.
+ *
+ * @param id
+ * The id for this service.
+ */
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ /**
+ * Get the id for this service.
+ *
+ * @return the service's id.
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Set the port of the gossip service.
+ *
+ * @param port
+ * The port for the gossip service.
+ */
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ /**
+ * Get the port for the gossip service.
+ *
+ * @return The port of the gossip service.
+ */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Get the GossipSettings.
+ *
+ * @return The GossipSettings object.
+ */
+ public GossipSettings getGossipSettings() {
+ return gossipSettings;
+ }
+
+ /**
+ * Add a gossip member to the list of members to start with.
+ *
+ * @param member
+ * The member to add.
+ */
+ public void addGossipMember(GossipMember member) {
+ gossipMembers.add(member);
+ }
+
+ /**
+ * Get the list with gossip members.
+ *
+ * @return The gossip members.
+ */
+ public List<GossipMember> getGossipMembers() {
+ return gossipMembers;
+ }
+
+ /**
+ * Parse the settings for the gossip service from a JSON file.
+ *
+ * @param jsonFile
+ * The file object which refers to the JSON config file.
+ * @return The StartupSettings object with the settings from the config file.
+ * @throws JSONException
+ * Thrown when the file is not well-formed JSON.
+ * @throws FileNotFoundException
+ * Thrown when the file cannot be found.
+ * @throws IOException
+ * Thrown when reading the file gives problems.
+ */
+ public static StartupSettings fromJSONFile(File jsonFile) throws JSONException,
+ FileNotFoundException, IOException {
+ // Read the file to a String.
+ StringBuffer buffer = new StringBuffer();
+ try (BufferedReader br = new BufferedReader(new FileReader(jsonFile)) ){
+ String line;
+ while ((line = br.readLine()) != null) {
+ buffer.append(line.trim());
+ }
+ }
+
+ JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0);
+ int port = jsonObject.getInt("port");
+ String id = jsonObject.getString("id");
+ int gossipInterval = jsonObject.getInt("gossip_interval");
+ int cleanupInterval = jsonObject.getInt("cleanup_interval");
+ String cluster = jsonObject.getString("cluster");
+ if (cluster == null){
+ throw new IllegalArgumentException("cluster was null. It is required");
+ }
+ StartupSettings settings = new StartupSettings(id, port, new GossipSettings(gossipInterval,
+ cleanupInterval), cluster);
+
+ // Now iterate over the members from the config file and add them to the settings.
+ String configMembersDetails = "Config-members [";
+ JSONArray membersJSON = jsonObject.getJSONArray("members");
+ for (int i = 0; i < membersJSON.length(); i++) {
+ JSONObject memberJSON = membersJSON.getJSONObject(i);
+ RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("cluster"),
+ memberJSON.getString("host"), memberJSON.getInt("port"), "");
+ settings.addGossipMember(member);
+ configMembersDetails += member.getAddress();
+ if (i < (membersJSON.length() - 1))
+ configMembersDetails += ", ";
+ }
+ log.info(configMembersDetails + "]");
+
+ // Return the created settings object.
+ return settings;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/event/GossipListener.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/event/GossipListener.java b/src/main/java/org/apache/gossip/event/GossipListener.java
new file mode 100644
index 0000000..2e882f6
--- /dev/null
+++ b/src/main/java/org/apache/gossip/event/GossipListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event;
+
+import org.apache.gossip.GossipMember;
+
+public interface GossipListener {
+ void gossipEvent(GossipMember member, GossipState state);
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/event/GossipState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/event/GossipState.java b/src/main/java/org/apache/gossip/event/GossipState.java
new file mode 100644
index 0000000..3b76c9e
--- /dev/null
+++ b/src/main/java/org/apache/gossip/event/GossipState.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.event;
+
+public enum GossipState {
+ UP("up"), DOWN("down");
+ @SuppressWarnings("unused")
+ private final String state;
+
+ private GossipState(String state) {
+ this.state = state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/examples/GossipExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java
new file mode 100644
index 0000000..e953c77
--- /dev/null
+++ b/src/main/java/org/apache/gossip/examples/GossipExample.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.examples;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+
+/**
+ * This class is an example of how one could use the gossip service. Here we start multiple gossip
+ * clients on this host as specified in the config file.
+ *
+ * @author harmenw
+ */
+public class GossipExample extends Thread {
+ /** The number of clients to start. */
+ private static final int NUMBER_OF_CLIENTS = 4;
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ new GossipExample();
+ }
+
+ /**
+ * Constructor. This will start the this thread.
+ */
+ public GossipExample() {
+ start();
+ }
+
+ /**
+ * @see java.lang.Thread#run()
+ */
+ public void run() {
+ try {
+ GossipSettings settings = new GossipSettings();
+
+ List<GossipService> clients = new ArrayList<>();
+
+ // Get my ip address.
+ String myIpAddress = InetAddress.getLocalHost().getHostAddress();
+
+ String cluster = "My Gossip Cluster";
+
+ // Create the gossip members and put them in a list and give them a port number starting with
+ // 2000.
+ List<GossipMember> startupMembers = new ArrayList<>();
+ for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
+ startupMembers.add(new RemoteGossipMember(cluster, myIpAddress, 2000 + i, ""));
+ }
+
+ // Lets start the gossip clients.
+ // Start the clients, waiting cleaning-interval + 1 second between them which will show the
+ // dead list handling.
+ for (GossipMember member : startupMembers) {
+ GossipService gossipService = new GossipService(cluster, myIpAddress, member.getPort(), "",
+ startupMembers, settings, null);
+ clients.add(gossipService);
+ gossipService.start();
+ sleep(settings.getCleanupInterval() + 1000);
+ }
+
+ // After starting all gossip clients, first wait 10 seconds and then shut them down.
+ sleep(10000);
+ System.err.println("Going to shutdown all services...");
+ // Since they all run in the same virtual machine and share the same executor, if one is
+ // shutdown they will all stop.
+ clients.get(0).shutdown();
+
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
new file mode 100644
index 0000000..b966fcb
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.LocalGossipMember;
+
+/**
+ * [The active thread: periodically send gossip request.] The class handles gossiping the membership
+ * list. This information is important to maintaining a common state among all the nodes, and is
+ * important for detecting failures.
+ */
+abstract public class ActiveGossipThread implements Runnable {
+
+ protected final GossipManager gossipManager;
+
+ private final AtomicBoolean keepRunning;
+
+ public ActiveGossipThread(GossipManager gossipManager) {
+ this.gossipManager = gossipManager;
+ this.keepRunning = new AtomicBoolean(true);
+ }
+
+ @Override
+ public void run() {
+ while (keepRunning.get()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval());
+ sendMembershipList(gossipManager.getMyself(), gossipManager.getMemberList());
+ } catch (InterruptedException e) {
+ GossipService.LOGGER.error(e);
+ keepRunning.set(false);
+ }
+ }
+ shutdown();
+ }
+
+ public void shutdown() {
+ keepRunning.set(false);
+ }
+
+ /**
+ * Performs the sending of the membership list, after we have incremented our own heartbeat.
+ */
+ abstract protected void sendMembershipList(LocalGossipMember me,
+ List<LocalGossipMember> memberList);
+
+ /**
+ * Abstract method which should be implemented by a subclass. This method should return a member
+ * of the list to gossip with.
+ *
+ * @param memberList
+ * The list of members which are stored in the local list of members.
+ * @return The chosen LocalGossipMember to gossip with.
+ */
+ abstract protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList);
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
new file mode 100644
index 0000000..80cadf7
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.management.Notification;
+import javax.management.NotificationListener;
+
+import org.apache.log4j.Logger;
+
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
+
+public abstract class GossipManager extends Thread implements NotificationListener {
+
+ public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
+
+ public static final int MAX_PACKET_SIZE = 102400;
+
+ private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members;
+
+ private final LocalGossipMember me;
+
+ private final GossipSettings settings;
+
+ private final AtomicBoolean gossipServiceRunning;
+
+ private final Class<? extends PassiveGossipThread> passiveGossipThreadClass;
+
+ private final Class<? extends ActiveGossipThread> activeGossipThreadClass;
+
+ private final GossipListener listener;
+
+ private ActiveGossipThread activeGossipThread;
+
+ private PassiveGossipThread passiveGossipThread;
+
+ private ExecutorService gossipThreadExecutor;
+
+ public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
+ Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster,
+ String address, int port, String id, GossipSettings settings,
+ List<GossipMember> gossipMembers, GossipListener listener) {
+ this.passiveGossipThreadClass = passiveGossipThreadClass;
+ this.activeGossipThreadClass = activeGossipThreadClass;
+ this.settings = settings;
+ me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this,
+ settings.getCleanupInterval());
+ members = new ConcurrentSkipListMap<>();
+ for (GossipMember startupMember : gossipMembers) {
+ if (!startupMember.equals(me)) {
+ LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
+ startupMember.getHost(), startupMember.getPort(), startupMember.getId(),
+ System.currentTimeMillis(), this, settings.getCleanupInterval());
+ members.put(member, GossipState.UP);
+ GossipService.LOGGER.debug(member);
+ }
+ }
+ gossipThreadExecutor = Executors.newCachedThreadPool();
+ gossipServiceRunning = new AtomicBoolean(true);
+ this.listener = listener;
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ public void run() {
+ GossipService.LOGGER.debug("Service has been shutdown...");
+ }
+ }));
+ }
+
+ /**
+ * All timers associated with a member will trigger this method when it goes off. The timer will
+ * go off if we have not heard from this member in <code> _settings.T_CLEANUP </code> time.
+ */
+ @Override
+ public void handleNotification(Notification notification, Object handback) {
+ LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
+ GossipService.LOGGER.debug("Dead member detected: " + deadMember);
+ members.put(deadMember, GossipState.DOWN);
+ if (listener != null) {
+ listener.gossipEvent(deadMember, GossipState.DOWN);
+ }
+ }
+
+ public void revivieMember(LocalGossipMember m) {
+ for (Entry<LocalGossipMember, GossipState> it : this.members.entrySet()) {
+ if (it.getKey().getId().equals(m.getId())) {
+ it.getKey().disableTimer();
+ }
+ }
+ members.remove(m);
+ members.put(m, GossipState.UP);
+ if (listener != null) {
+ listener.gossipEvent(m, GossipState.UP);
+ }
+ }
+
+ public void createOrRevivieMember(LocalGossipMember m) {
+ members.put(m, GossipState.UP);
+ if (listener != null) {
+ listener.gossipEvent(m, GossipState.UP);
+ }
+ }
+
+ public GossipSettings getSettings() {
+ return settings;
+ }
+
+ /**
+ *
+ * @return a read only list of members found in the UP state
+ */
+ public List<LocalGossipMember> getMemberList() {
+ List<LocalGossipMember> up = new ArrayList<>();
+ for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
+ if (GossipState.UP.equals(entry.getValue())) {
+ up.add(entry.getKey());
+ }
+ }
+ return Collections.unmodifiableList(up);
+ }
+
+ public LocalGossipMember getMyself() {
+ return me;
+ }
+
+ public List<LocalGossipMember> getDeadList() {
+ List<LocalGossipMember> up = new ArrayList<>();
+ for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
+ if (GossipState.DOWN.equals(entry.getValue())) {
+ up.add(entry.getKey());
+ }
+ }
+ return Collections.unmodifiableList(up);
+ }
+
+ /**
+ * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
+ * thread and start the receiver thread.
+ */
+ public void run() {
+ for (LocalGossipMember member : members.keySet()) {
+ if (member != me) {
+ member.startTimeoutTimer();
+ }
+ }
+ try {
+ passiveGossipThread = passiveGossipThreadClass.getConstructor(GossipManager.class)
+ .newInstance(this);
+ gossipThreadExecutor.execute(passiveGossipThread);
+ activeGossipThread = activeGossipThreadClass.getConstructor(GossipManager.class)
+ .newInstance(this);
+ gossipThreadExecutor.execute(activeGossipThread);
+ } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
+ | InvocationTargetException | NoSuchMethodException | SecurityException e1) {
+ throw new RuntimeException(e1);
+ }
+ GossipService.LOGGER.debug("The GossipService is started.");
+ while (gossipServiceRunning.get()) {
+ try {
+ // TODO
+ TimeUnit.MILLISECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ GossipService.LOGGER.warn("The GossipClient was interrupted.");
+ }
+ }
+ }
+
+ /**
+ * Shutdown the gossip service.
+ */
+ public void shutdown() {
+ gossipServiceRunning.set(false);
+ gossipThreadExecutor.shutdown();
+ if (passiveGossipThread != null) {
+ passiveGossipThread.shutdown();
+ }
+ if (activeGossipThread != null) {
+ activeGossipThread.shutdown();
+ }
+ try {
+ boolean result = gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ if (!result) {
+ LOGGER.error("executor shutdown timed out");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
new file mode 100644
index 0000000..bd7354e
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.apache.gossip.RemoteGossipMember;
+
+/**
+ * [The passive thread: reply to incoming gossip request.] This class handles the passive cycle,
+ * where this client has received an incoming message. For now, this message is always the
+ * membership list, but if you choose to gossip additional information, you will need some logic to
+ * determine the incoming message.
+ */
+abstract public class PassiveGossipThread implements Runnable {
+
+ public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
+
+ /** The socket used for the passive thread of the gossip service. */
+ private final DatagramSocket server;
+
+ private final GossipManager gossipManager;
+
+ private final AtomicBoolean keepRunning;
+
+ private final String cluster;
+
+ private final ObjectMapper MAPPER = new ObjectMapper();
+
+ public PassiveGossipThread(GossipManager gossipManager) {
+ this.gossipManager = gossipManager;
+ try {
+ SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getHost(),
+ gossipManager.getMyself().getPort());
+ server = new DatagramSocket(socketAddress);
+ GossipService.LOGGER.debug("Gossip service successfully initialized on port "
+ + gossipManager.getMyself().getPort());
+ GossipService.LOGGER.debug("I am " + gossipManager.getMyself());
+ cluster = gossipManager.getMyself().getClusterName();
+ if (cluster == null){
+ throw new IllegalArgumentException("cluster was null");
+ }
+ } catch (SocketException ex) {
+ GossipService.LOGGER.warn(ex);
+ throw new RuntimeException(ex);
+ }
+ keepRunning = new AtomicBoolean(true);
+ }
+
+ @Override
+ public void run() {
+ while (keepRunning.get()) {
+ try {
+ byte[] buf = new byte[server.getReceiveBufferSize()];
+ DatagramPacket p = new DatagramPacket(buf, buf.length);
+ server.receive(p);
+ int packet_length = 0;
+ for (int i = 0; i < 4; i++) {
+ int shift = (4 - 1 - i) * 8;
+ packet_length += (buf[i] & 0x000000FF) << shift;
+ }
+ if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
+ byte[] json_bytes = new byte[packet_length];
+ for (int i = 0; i < packet_length; i++) {
+ json_bytes[i] = buf[i + 4];
+ }
+ if (GossipService.LOGGER.isDebugEnabled()){
+ String receivedMessage = new String(json_bytes);
+ GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): "
+ + receivedMessage);
+ }
+ try {
+ List<GossipMember> remoteGossipMembers = new ArrayList<>();
+ RemoteGossipMember senderMember = null;
+ ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes,
+ ActiveGossipMessage.class);
+ for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
+ RemoteGossipMember member = new RemoteGossipMember(
+ activeGossipMessage.getMembers().get(i).getCluster(),
+ activeGossipMessage.getMembers().get(i).getHost(),
+ activeGossipMessage.getMembers().get(i).getPort(),
+ activeGossipMessage.getMembers().get(i).getId(),
+ activeGossipMessage.getMembers().get(i).getHeartbeat());
+ if (!(member.getClusterName().equals(cluster))){
+ GossipService.LOGGER.warn("Note a member of this cluster " + i);
+ continue;
+ }
+ // This is the first member found, so this should be the member who is communicating
+ // with me.
+ if (i == 0) {
+ senderMember = member;
+ }
+ remoteGossipMembers.add(member);
+ }
+ mergeLists(gossipManager, senderMember, remoteGossipMembers);
+ } catch (RuntimeException ex) {
+ GossipService.LOGGER.error("Unable to process message", ex);
+ }
+ } else {
+ GossipService.LOGGER
+ .error("The received message is not of the expected size, it has been dropped.");
+ }
+
+ } catch (IOException e) {
+ GossipService.LOGGER.error(e);
+ System.out.println(e);
+ keepRunning.set(false);
+ }
+ }
+ shutdown();
+ }
+
+ public void shutdown() {
+ try {
+ server.close();
+ } catch (RuntimeException ex) {
+ }
+ }
+
+ /**
+ * Abstract method for merging the local and remote list.
+ *
+ * @param gossipManager
+ * The GossipManager for retrieving the local members and dead members list.
+ * @param senderMember
+ * The member who is sending this list, this could be used to send a response if the
+ * remote list contains out-dated information.
+ * @param remoteList
+ * The list of members known at the remote side.
+ */
+ abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
+ List<GossipMember> remoteList);
+}
+
+/*
+ * random comments // Check whether the package is smaller than the maximal packet length. // A
+ * package larger than this would not be possible to be send from a GossipService, // since this is
+ * check before sending the message. // This could normally only occur when the list of members is
+ * very big, // or when the packet is malformed, and the first 4 bytes is not the right in anymore.
+ * // For this reason we regards the message.
+ */
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
new file mode 100644
index 0000000..edf21f3
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.impl;
+
+import java.util.List;
+
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.PassiveGossipThread;
+
+public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread {
+
+ public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
+ super(gossipManager);
+ }
+
+ /**
+ * Merge remote list (received from peer), and our local member list. Simply, we must update the
+ * heartbeats that the remote list has with our list. Also, some additional logic is needed to
+ * make sure we have not timed out a member and then immediately received a list with that member.
+ *
+ * @param gossipManager
+ * @param senderMember
+ * @param remoteList
+ */
+ protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
+ List<GossipMember> remoteList) {
+
+ // if the person sending to us is in the dead list consider them up
+ for (LocalGossipMember i : gossipManager.getDeadList()) {
+ if (i.getId().equals(senderMember.getId())) {
+ System.out.println(gossipManager.getMyself() + " caught a live one!");
+ LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
+ senderMember.getHost(), senderMember.getPort(), senderMember.getId(),
+ senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
+ .getCleanupInterval());
+ gossipManager.revivieMember(newLocalMember);
+ newLocalMember.startTimeoutTimer();
+ }
+ }
+ for (GossipMember remoteMember : remoteList) {
+ if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
+ continue;
+ }
+ if (gossipManager.getMemberList().contains(remoteMember)) {
+ LocalGossipMember localMember = gossipManager.getMemberList().get(
+ gossipManager.getMemberList().indexOf(remoteMember));
+ if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
+ localMember.setHeartbeat(remoteMember.getHeartbeat());
+ localMember.resetTimeoutTimer();
+ }
+ } else if (!gossipManager.getMemberList().contains(remoteMember)
+ && !gossipManager.getDeadList().contains(remoteMember)) {
+ LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
+ remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
+ remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
+ .getCleanupInterval());
+ gossipManager.createOrRevivieMember(newLocalMember);
+ newLocalMember.startTimeoutTimer();
+ } else {
+ if (gossipManager.getDeadList().contains(remoteMember)) {
+ LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
+ gossipManager.getDeadList().indexOf(remoteMember));
+ if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
+ LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
+ remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
+ remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
+ .getCleanupInterval());
+ gossipManager.revivieMember(newLocalMember);
+ newLocalMember.startTimeoutTimer();
+ GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress()
+ + " from dead list and added to local member list.");
+ } else {
+ GossipService.LOGGER.debug("me " + gossipManager.getMyself());
+ GossipService.LOGGER.debug("sender " + senderMember);
+ GossipService.LOGGER.debug("remote " + remoteList);
+ GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
+ GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
+ }
+ } else {
+ GossipService.LOGGER.debug("me " + gossipManager.getMyself());
+ GossipService.LOGGER.debug("sender " + senderMember);
+ GossipService.LOGGER.debug("remote " + remoteList);
+ GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
+ GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
+ // throw new IllegalArgumentException("wtf");
+ }
+ }
+ }
+ }
+
+}
+
+/**
+ * old comment section: // If a member is restarted the heartbeat will restart from 1, so we should
+ * check // that here. // So a member can become from the dead when it is either larger than a
+ * previous // heartbeat (due to network failure) // or when the heartbeat is 1 (after a restart of
+ * the service). // TODO: What if the first message of a gossip service is sent to a dead node? The
+ * // second member will receive a heartbeat of two. // TODO: The above does happen. Maybe a special
+ * message for a revived member? // TODO: Or maybe when a member is declared dead for more than //
+ * _settings.getCleanupInterval() ms, reset the heartbeat to 0. // It will then accept a revived
+ * member. // The above is now handle by checking whether the heartbeat differs //
+ * _settings.getCleanupInterval(), it must be restarted.
+ */
+
+/*
+ * // The remote member is back from the dead. // Remove it from the dead list. //
+ * gossipManager.getDeadList().remove(localDeadMember); // Add it as a new member and add it to the
+ * member list.
+ */
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
new file mode 100644
index 0000000..16d0d32
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.impl;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.manager.ActiveGossipThread;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.gossip.model.GossipMember;
+import org.codehaus.jackson.map.ObjectMapper;
+
+abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
+
+ protected ObjectMapper om = new ObjectMapper();
+
+ public SendMembersActiveGossipThread(GossipManager gossipManager) {
+ super(gossipManager);
+ }
+
+ private GossipMember convert(LocalGossipMember member){
+ GossipMember gm = new GossipMember();
+ gm.setCluster(member.getClusterName());
+ gm.setHeartbeat(member.getHeartbeat());
+ gm.setHost(member.getHost());
+ gm.setId(member.getId());
+ gm.setPort(member.getPort());
+ return gm;
+ }
+
+ /**
+ * Performs the sending of the membership list, after we have incremented our own heartbeat.
+ */
+ protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
+ GossipService.LOGGER.debug("Send sendMembershipList() is called.");
+ me.setHeartbeat(System.currentTimeMillis());
+ LocalGossipMember member = selectPartner(memberList);
+ if (member == null) {
+ return;
+ }
+ try (DatagramSocket socket = new DatagramSocket()) {
+ socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
+ InetAddress dest = InetAddress.getByName(member.getHost());
+ ActiveGossipMessage message = new ActiveGossipMessage();
+ message.getMembers().add(convert(me));
+ for (LocalGossipMember other : memberList) {
+ message.getMembers().add(convert(other));
+ }
+ byte[] json_bytes = om.writeValueAsString(message).getBytes();
+ int packet_length = json_bytes.length;
+ if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+ byte[] buf = createBuffer(packet_length, json_bytes);
+ DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort());
+ socket.send(datagramPacket);
+ } else {
+ GossipService.LOGGER.error("The length of the to be send message is too large ("
+ + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
+ }
+ } catch (IOException e1) {
+ GossipService.LOGGER.warn(e1);
+ }
+ }
+
+ private byte[] createBuffer(int packetLength, byte[] jsonBytes) {
+ byte[] lengthBytes = new byte[4];
+ lengthBytes[0] = (byte) (packetLength >> 24);
+ lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
+ lengthBytes[2] = (byte) ((packetLength << 16) >> 24);
+ lengthBytes[3] = (byte) ((packetLength << 24) >> 24);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length);
+ byteBuffer.put(lengthBytes);
+ byteBuffer.put(jsonBytes);
+ byte[] buf = byteBuffer.array();
+ return buf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
new file mode 100644
index 0000000..23a41f5
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.random;
+
+import java.util.List;
+import java.util.Random;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.impl.SendMembersActiveGossipThread;
+
+public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
+
+ /** The Random used for choosing a member to gossip with. */
+ private final Random random;
+
+ public RandomActiveGossipThread(GossipManager gossipManager) {
+ super(gossipManager);
+ random = new Random();
+ }
+
+ /**
+ * [The selectToSend() function.] Find a random peer from the local membership list. In the case
+ * where this client is the only member in the list, this method will return null.
+ *
+ * @return Member random member if list is greater than 1, null otherwise
+ */
+ protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
+ LocalGossipMember member = null;
+ if (memberList.size() > 0) {
+ int randomNeighborIndex = random.nextInt(memberList.size());
+ member = memberList.get(randomNeighborIndex);
+ } else {
+ GossipService.LOGGER.debug("I am alone in this world.");
+ }
+ return member;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
new file mode 100644
index 0000000..0122610
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.random;
+
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
+
+import java.util.List;
+
+public class RandomGossipManager extends GossipManager {
+ public RandomGossipManager(String cluster, String address, int port, String id,
+ GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) {
+ super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster,
+ address, port, id, settings, gossipMembers, listener);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
new file mode 100644
index 0000000..ac940d8
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java
@@ -0,0 +1,22 @@
+package org.apache.gossip.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActiveGossipMessage {
+
+ private List<GossipMember> members = new ArrayList<>();
+
+ public ActiveGossipMessage(){
+
+ }
+
+ public List<GossipMember> getMembers() {
+ return members;
+ }
+
+ public void setMembers(List<GossipMember> members) {
+ this.members = members;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/model/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/model/GossipMember.java b/src/main/java/org/apache/gossip/model/GossipMember.java
new file mode 100644
index 0000000..8dc6bf7
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/GossipMember.java
@@ -0,0 +1,63 @@
+package org.apache.gossip.model;
+
+public class GossipMember {
+
+ private String cluster;
+ private String host;
+ private Integer port;
+ private String id;
+ private Long heartbeat;
+
+ public GossipMember(){
+
+ }
+
+ public GossipMember(String cluster, String host, Integer port, String id, Long heartbeat){
+ this.cluster=cluster;
+ this.host= host;
+ this.port = port;
+ this.id = id;
+
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(String cluster) {
+ this.cluster = cluster;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public Integer getPort() {
+ return port;
+ }
+
+ public void setPort(Integer port) {
+ this.port = port;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Long getHeartbeat() {
+ return heartbeat;
+ }
+
+ public void setHeartbeat(Long heartbeat) {
+ this.heartbeat = heartbeat;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
index af30eb7..2d8190b 100644
--- a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
@@ -31,12 +31,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.junit.Test;
-import com.google.code.gossip.GossipMember;
-import com.google.code.gossip.GossipService;
-import com.google.code.gossip.GossipSettings;
-import com.google.code.gossip.RemoteGossipMember;
-import com.google.code.gossip.event.GossipListener;
-import com.google.code.gossip.event.GossipState;
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
public class ShutdownDeadtimeTest {
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/test/java/io/teknek/gossip/StartupSettingsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/StartupSettingsTest.java b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
index bf6710e..aa4e404 100644
--- a/src/test/java/io/teknek/gossip/StartupSettingsTest.java
+++ b/src/test/java/io/teknek/gossip/StartupSettingsTest.java
@@ -17,10 +17,10 @@
*/
package io.teknek.gossip;
-import com.google.code.gossip.GossipMember;
-import com.google.code.gossip.GossipService;
-import com.google.code.gossip.GossipSettings;
-import com.google.code.gossip.StartupSettings;
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.StartupSettings;
import org.apache.log4j.Logger;
import org.json.JSONException;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
index 277d0fe..4e731ae 100644
--- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java
@@ -30,12 +30,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.junit.Test;
-import com.google.code.gossip.GossipMember;
-import com.google.code.gossip.GossipService;
-import com.google.code.gossip.GossipSettings;
-import com.google.code.gossip.RemoteGossipMember;
-import com.google.code.gossip.event.GossipListener;
-import com.google.code.gossip.event.GossipState;
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
public class TenNodeThreeSeedTest {
private static final Logger log = Logger.getLogger( TenNodeThreeSeedTest.class );
[10/12] incubator-gossip git commit: renamed packages from 'google'
to 'apache' and updated necessary imports
Posted by ec...@apache.org.
renamed packages from 'google' to 'apache' and updated necessary imports
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/968203e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/968203e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/968203e8
Branch: refs/heads/master
Commit: 968203e8a025bc8564127e8188eeacfc4d8332c0
Parents: 6a35db7
Author: Dorian Ellerbe <do...@localhost-fedora23.fios-router.home>
Authored: Sun May 29 00:50:32 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Tue Jun 7 22:52:54 2016 -0400
----------------------------------------------------------------------
.../com/google/code/gossip/GossipMember.java | 168 --------------
.../com/google/code/gossip/GossipRunner.java | 60 -----
.../com/google/code/gossip/GossipService.java | 87 --------
.../com/google/code/gossip/GossipSettings.java | 90 --------
.../google/code/gossip/GossipTimeoutTimer.java | 78 -------
.../google/code/gossip/LocalGossipMember.java | 70 ------
.../google/code/gossip/RemoteGossipMember.java | 53 -----
.../com/google/code/gossip/StartupSettings.java | 212 ------------------
.../code/gossip/event/GossipListener.java | 24 --
.../google/code/gossip/event/GossipState.java | 28 ---
.../code/gossip/examples/GossipExample.java | 99 ---------
.../code/gossip/manager/ActiveGossipThread.java | 76 -------
.../code/gossip/manager/GossipManager.java | 217 -------------------
.../gossip/manager/PassiveGossipThread.java | 169 ---------------
.../OnlyProcessReceivedPassiveGossipThread.java | 128 -----------
.../impl/SendMembersActiveGossipThread.java | 100 ---------
.../random/RandomActiveGossipThread.java | 55 -----
.../manager/random/RandomGossipManager.java | 34 ---
.../code/gossip/model/ActiveGossipMessage.java | 22 --
.../google/code/gossip/model/GossipMember.java | 63 ------
.../java/org/apache/gossip/GossipMember.java | 168 ++++++++++++++
.../java/org/apache/gossip/GossipRunner.java | 60 +++++
.../java/org/apache/gossip/GossipService.java | 86 ++++++++
.../java/org/apache/gossip/GossipSettings.java | 90 ++++++++
.../org/apache/gossip/GossipTimeoutTimer.java | 78 +++++++
.../org/apache/gossip/LocalGossipMember.java | 70 ++++++
.../org/apache/gossip/RemoteGossipMember.java | 53 +++++
.../java/org/apache/gossip/StartupSettings.java | 212 ++++++++++++++++++
.../org/apache/gossip/event/GossipListener.java | 24 ++
.../org/apache/gossip/event/GossipState.java | 28 +++
.../apache/gossip/examples/GossipExample.java | 99 +++++++++
.../gossip/manager/ActiveGossipThread.java | 76 +++++++
.../apache/gossip/manager/GossipManager.java | 217 +++++++++++++++++++
.../gossip/manager/PassiveGossipThread.java | 169 +++++++++++++++
.../OnlyProcessReceivedPassiveGossipThread.java | 128 +++++++++++
.../impl/SendMembersActiveGossipThread.java | 99 +++++++++
.../random/RandomActiveGossipThread.java | 55 +++++
.../manager/random/RandomGossipManager.java | 34 +++
.../gossip/model/ActiveGossipMessage.java | 22 ++
.../org/apache/gossip/model/GossipMember.java | 63 ++++++
.../io/teknek/gossip/ShutdownDeadtimeTest.java | 12 +-
.../io/teknek/gossip/StartupSettingsTest.java | 8 +-
.../io/teknek/gossip/TenNodeThreeSeedTest.java | 12 +-
43 files changed, 1847 insertions(+), 1849 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java
deleted file mode 100644
index 56029fa..0000000
--- a/src/main/java/com/google/code/gossip/GossipMember.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip;
-
-import java.net.InetSocketAddress;
-
-/**
- * A abstract class representing a gossip member.
- *
- * @author joshclemm, harmenw
- */
-public abstract class GossipMember implements Comparable<GossipMember> {
-
-
- protected final String host;
-
- protected final int port;
-
- protected volatile long heartbeat;
-
- protected final String clusterName;
-
- /**
- * The purpose of the id field is to be able for nodes to identify themselves beyond there
- * host/port. For example an application might generate a persistent id so if they rejoin the
- * cluster at a different host and port we are aware it is the same node.
- */
- protected String id;
-
- /**
- * Constructor.
- *
- * @param host
- * The hostname or IP address.
- * @param port
- * The port number.
- * @param heartbeat
- * The current heartbeat.
- * @param id
- * an id that may be replaced after contact
- */
- public GossipMember(String clusterName, String host, int port, String id, long heartbeat) {
- this.clusterName = clusterName;
- this.host = host;
- this.port = port;
- this.id = id;
- this.heartbeat = heartbeat;
- }
-
- /**
- * Get the name of the cluster the member belongs to.
- *
- * @return The cluster name
- */
- public String getClusterName() {
- return clusterName;
- }
-
- /**
- * Get the hostname or IP address of the remote gossip member.
- *
- * @return The hostname or IP address.
- */
- public String getHost() {
- return host;
- }
-
- /**
- * Get the port number of the remote gossip member.
- *
- * @return The port number.
- */
- public int getPort() {
- return port;
- }
-
- /**
- * The member address in the form IP/host:port Similar to the toString in
- * {@link InetSocketAddress}
- */
- public String getAddress() {
- return host + ":" + port;
- }
-
- /**
- * Get the heartbeat of this gossip member.
- *
- * @return The current heartbeat.
- */
- public long getHeartbeat() {
- return heartbeat;
- }
-
- /**
- * Set the heartbeat of this gossip member.
- *
- * @param heartbeat
- * The new heartbeat.
- */
- public void setHeartbeat(long heartbeat) {
- this.heartbeat = heartbeat;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String _id) {
- this.id = _id;
- }
-
- public String toString() {
- return "Member [address=" + getAddress() + ", id=" + id + ", heartbeat=" + heartbeat + "]";
- }
-
- /**
- * @see java.lang.Object#hashCode()
- */
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- String address = getAddress();
- result = prime * result + ((address == null) ? 0 : address.hashCode()) + clusterName == null ? 0
- : clusterName.hashCode();
- return result;
- }
-
- /**
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- System.err.println("equals(): obj is null.");
- return false;
- }
- if (!(obj instanceof GossipMember)) {
- System.err.println("equals(): obj is not of type GossipMember.");
- return false;
- }
- // The object is the same of they both have the same address (hostname and port).
- return getAddress().equals(((LocalGossipMember) obj).getAddress())
- && getClusterName().equals(((LocalGossipMember) obj).getClusterName());
- }
-
- public int compareTo(GossipMember other) {
- return this.getAddress().compareTo(other.getAddress());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/GossipRunner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/GossipRunner.java b/src/main/java/com/google/code/gossip/GossipRunner.java
deleted file mode 100644
index 7530fd9..0000000
--- a/src/main/java/com/google/code/gossip/GossipRunner.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.json.JSONException;
-
-public class GossipRunner {
-
- public static void main(String[] args) {
- File configFile;
- if (args.length == 1) {
- configFile = new File("./" + args[0]);
- } else {
- configFile = new File("gossip.conf");
- }
- new GossipRunner(configFile);
- }
-
- public GossipRunner(File configFile) {
- if (configFile != null && configFile.exists()) {
- try {
- System.out.println("Parsing the configuration file...");
- StartupSettings _settings = StartupSettings.fromJSONFile(configFile);
- GossipService gossipService = new GossipService(_settings);
- System.out.println("Gossip service successfully initialized, let's start it...");
- gossipService.start();
- } catch (FileNotFoundException e) {
- System.err.println("The given file is not found!");
- } catch (JSONException e) {
- System.err.println("The given file is not in the correct JSON format!");
- } catch (IOException e) {
- System.err.println("Could not read the configuration file: " + e.getMessage());
- } catch (InterruptedException e) {
- System.err.println("Error while starting the gossip service: " + e.getMessage());
- }
- } else {
- System.out
- .println("The gossip.conf file is not found.\n\nEither specify the path to the startup settings file or place the gossip.json file in the same folder as the JAR file.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/GossipService.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/GossipService.java b/src/main/java/com/google/code/gossip/GossipService.java
deleted file mode 100644
index 2226a48..0000000
--- a/src/main/java/com/google/code/gossip/GossipService.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-
-import com.google.code.gossip.event.GossipListener;
-import com.google.code.gossip.manager.GossipManager;
-import com.google.code.gossip.manager.random.RandomGossipManager;
-
-/**
- * This object represents the service which is responsible for gossiping with other gossip members.
- *
- * @author joshclemm, harmenw
- */
-public class GossipService {
-
- public static final Logger LOGGER = Logger.getLogger(GossipService.class);
-
- private GossipManager gossipManager;
-
- /**
- * Constructor with the default settings.
- *
- * @throws InterruptedException
- * @throws UnknownHostException
- */
- public GossipService(StartupSettings startupSettings) throws InterruptedException,
- UnknownHostException {
- this(startupSettings.getCluster(), InetAddress.getLocalHost().getHostAddress(), startupSettings
- .getPort(), startupSettings.getId(), startupSettings.getGossipMembers(),
- startupSettings.getGossipSettings(), null);
- }
-
- /**
- * Setup the client's lists, gossiping parameters, and parse the startup config file.
- *
- * @throws InterruptedException
- * @throws UnknownHostException
- */
- public GossipService(String cluster, String ipAddress, int port, String id,
- List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
- throws InterruptedException, UnknownHostException {
- gossipManager = new RandomGossipManager(cluster, ipAddress, port, id, settings, gossipMembers,
- listener);
- }
-
- public void start() {
- String address = get_gossipManager().getMyself().getHost() + ":"
- + get_gossipManager().getMyself().getPort();
- LOGGER.debug("Starting: " + gossipManager.getName() + " - " + address);
-
- gossipManager.start();
- }
-
- public void shutdown() {
- gossipManager.shutdown();
- }
-
- public GossipManager get_gossipManager() {
- return gossipManager;
- }
-
- public void set_gossipManager(GossipManager _gossipManager) {
- this.gossipManager = _gossipManager;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/GossipSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/GossipSettings.java b/src/main/java/com/google/code/gossip/GossipSettings.java
deleted file mode 100644
index ec9aae1..0000000
--- a/src/main/java/com/google/code/gossip/GossipSettings.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip;
-
-/**
- * In this object the settings used by the GossipService are held.
- *
- * @author harmenw
- */
-public class GossipSettings {
-
- /** Time between gossip'ing in ms. Default is 1 second. */
- private int gossipInterval = 1000;
-
- /** Time between cleanups in ms. Default is 10 seconds. */
- private int cleanupInterval = 10000;
-
- /**
- * Construct GossipSettings with default settings.
- */
- public GossipSettings() {
- }
-
- /**
- * Construct GossipSettings with given settings.
- *
- * @param gossipInterval
- * The gossip interval in ms.
- * @param cleanupInterval
- * The cleanup interval in ms.
- */
- public GossipSettings(int gossipInterval, int cleanupInterval) {
- this.gossipInterval = gossipInterval;
- this.cleanupInterval = cleanupInterval;
- }
-
- /**
- * Set the gossip interval. This is the time between a gossip message is send.
- *
- * @param gossipInterval
- * The gossip interval in ms.
- */
- public void setGossipTimeout(int gossipInterval) {
- this.gossipInterval = gossipInterval;
- }
-
- /**
- * Set the cleanup interval. This is the time between the last heartbeat received from a member
- * and when it will be marked as dead.
- *
- * @param cleanupInterval
- * The cleanup interval in ms.
- */
- public void setCleanupInterval(int cleanupInterval) {
- this.cleanupInterval = cleanupInterval;
- }
-
- /**
- * Get the gossip interval.
- *
- * @return The gossip interval in ms.
- */
- public int getGossipInterval() {
- return gossipInterval;
- }
-
- /**
- * Get the clean interval.
- *
- * @return The cleanup interval.
- */
- public int getCleanupInterval() {
- return cleanupInterval;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java
deleted file mode 100644
index a1bf130..0000000
--- a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip;
-
-import java.util.Date;
-
-import javax.management.NotificationListener;
-import javax.management.timer.Timer;
-
-/**
- * This object represents a timer for a gossip member. When the timer has elapsed without being
- * reset in the meantime, it will inform the GossipService about this who in turn will put the
- * gossip member on the dead list, because it is apparantly not alive anymore.
- *
- * @author joshclemm, harmenw
- */
-public class GossipTimeoutTimer extends Timer {
-
- private final long sleepTime;
-
- private final LocalGossipMember source;
-
- /**
- * Constructor. Creates a reset-able timer that wakes up after millisecondsSleepTime.
- *
- * @param millisecondsSleepTime
- * The time for this timer to wait before an event.
- * @param notificationListener
- * @param member
- */
- public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener,
- LocalGossipMember member) {
- super();
- sleepTime = millisecondsSleepTime;
- source = member;
- addNotificationListener(notificationListener, null, null);
- }
-
- /**
- * @see javax.management.timer.Timer#start()
- */
- public void start() {
- this.reset();
- super.start();
- }
-
- /**
- * Resets timer to start counting down from original time.
- */
- public void reset() {
- removeAllNotifications();
- setWakeupTime(sleepTime);
- }
-
- /**
- * Adds a new wake-up time for this timer.
- *
- * @param milliseconds
- */
- private void setWakeupTime(long milliseconds) {
- addNotification("type", "message", source, new Date(System.currentTimeMillis() + milliseconds));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/LocalGossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/LocalGossipMember.java b/src/main/java/com/google/code/gossip/LocalGossipMember.java
deleted file mode 100644
index 216da96..0000000
--- a/src/main/java/com/google/code/gossip/LocalGossipMember.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip;
-
-import javax.management.NotificationListener;
-
-/**
- * This object represent a gossip member with the properties known locally. These objects are stored
- * in the local list of gossip member.s
- *
- * @author harmenw
- */
-public class LocalGossipMember extends GossipMember {
- /** The timeout timer for this gossip member. */
- private final transient GossipTimeoutTimer timeoutTimer;
-
- /**
- * Constructor.
- *
- * @param hostname
- * The hostname or IP address.
- * @param port
- * The port number.
- * @param id
- * @param heartbeat
- * The current heartbeat.
- * @param notificationListener
- * @param cleanupTimeout
- * The cleanup timeout for this gossip member.
- */
- public LocalGossipMember(String clusterName, String hostname, int port, String id,
- long heartbeat, NotificationListener notificationListener, int cleanupTimeout) {
- super(clusterName, hostname, port, id, heartbeat);
-
- timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
- }
-
- /**
- * Start the timeout timer.
- */
- public void startTimeoutTimer() {
- timeoutTimer.start();
- }
-
- /**
- * Reset the timeout timer.
- */
- public void resetTimeoutTimer() {
- timeoutTimer.reset();
- }
-
- public void disableTimer() {
- timeoutTimer.removeAllNotifications();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/RemoteGossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/RemoteGossipMember.java b/src/main/java/com/google/code/gossip/RemoteGossipMember.java
deleted file mode 100644
index a7c3a1f..0000000
--- a/src/main/java/com/google/code/gossip/RemoteGossipMember.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip;
-
-/**
- * The object represents a gossip member with the properties as received from a remote gossip
- * member.
- *
- * @author harmenw
- */
-public class RemoteGossipMember extends GossipMember {
-
- /**
- * Constructor.
- *
- * @param hostname
- * The hostname or IP address.
- * @param port
- * The port number.
- * @param heartbeat
- * The current heartbeat.
- */
- public RemoteGossipMember(String clusterName, String hostname, int port, String id, long heartbeat) {
- super(clusterName, hostname, port, id, heartbeat);
- }
-
- /**
- * Construct a RemoteGossipMember with a heartbeat of 0.
- *
- * @param hostname
- * The hostname or IP address.
- * @param port
- * The port number.
- */
- public RemoteGossipMember(String clusterName, String hostname, int port, String id) {
- super(clusterName, hostname, port, id, System.currentTimeMillis());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/StartupSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/StartupSettings.java b/src/main/java/com/google/code/gossip/StartupSettings.java
deleted file mode 100644
index 53ed725..0000000
--- a/src/main/java/com/google/code/gossip/StartupSettings.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * This object represents the settings used when starting the gossip service.
- *
- * @author harmenw
- */
-public class StartupSettings {
- private static final Logger log = Logger.getLogger(StartupSettings.class);
-
- /** The id to use fo the service */
- private String id;
-
- /** The port to start the gossip service on. */
- private int port;
-
- private String cluster;
-
- /** The gossip settings used at startup. */
- private final GossipSettings gossipSettings;
-
- /** The list with gossip members to start with. */
- private final List<GossipMember> gossipMembers;
-
- /**
- * Constructor.
- *
- * @param id
- * The id to be used for this service
- * @param port
- * The port to start the service on.
- * @param logLevel
- * unused
- */
- public StartupSettings(String id, int port, int logLevel, String cluster) {
- this(id, port, new GossipSettings(), cluster);
- }
-
- /**
- * Constructor.
- *
- * @param id
- * The id to be used for this service
- * @param port
- * The port to start the service on.
- */
- public StartupSettings(String id, int port, GossipSettings gossipSettings, String cluster) {
- this.id = id;
- this.port = port;
- this.gossipSettings = gossipSettings;
- this.setCluster(cluster);
- gossipMembers = new ArrayList<>();
- }
-
- public void setCluster(String cluster) {
- this.cluster = cluster;
- }
-
- public String getCluster() {
- return cluster;
- }
-
- /**
- * Set the id to be used for this service.
- *
- * @param id
- * The id for this service.
- */
- public void setId(String id) {
- this.id = id;
- }
-
- /**
- * Get the id for this service.
- *
- * @return the service's id.
- */
- public String getId() {
- return id;
- }
-
- /**
- * Set the port of the gossip service.
- *
- * @param port
- * The port for the gossip service.
- */
- public void setPort(int port) {
- this.port = port;
- }
-
- /**
- * Get the port for the gossip service.
- *
- * @return The port of the gossip service.
- */
- public int getPort() {
- return port;
- }
-
- /**
- * Get the GossipSettings.
- *
- * @return The GossipSettings object.
- */
- public GossipSettings getGossipSettings() {
- return gossipSettings;
- }
-
- /**
- * Add a gossip member to the list of members to start with.
- *
- * @param member
- * The member to add.
- */
- public void addGossipMember(GossipMember member) {
- gossipMembers.add(member);
- }
-
- /**
- * Get the list with gossip members.
- *
- * @return The gossip members.
- */
- public List<GossipMember> getGossipMembers() {
- return gossipMembers;
- }
-
- /**
- * Parse the settings for the gossip service from a JSON file.
- *
- * @param jsonFile
- * The file object which refers to the JSON config file.
- * @return The StartupSettings object with the settings from the config file.
- * @throws JSONException
- * Thrown when the file is not well-formed JSON.
- * @throws FileNotFoundException
- * Thrown when the file cannot be found.
- * @throws IOException
- * Thrown when reading the file gives problems.
- */
- public static StartupSettings fromJSONFile(File jsonFile) throws JSONException,
- FileNotFoundException, IOException {
- // Read the file to a String.
- StringBuffer buffer = new StringBuffer();
- try (BufferedReader br = new BufferedReader(new FileReader(jsonFile)) ){
- String line;
- while ((line = br.readLine()) != null) {
- buffer.append(line.trim());
- }
- }
-
- JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0);
- int port = jsonObject.getInt("port");
- String id = jsonObject.getString("id");
- int gossipInterval = jsonObject.getInt("gossip_interval");
- int cleanupInterval = jsonObject.getInt("cleanup_interval");
- String cluster = jsonObject.getString("cluster");
- if (cluster == null){
- throw new IllegalArgumentException("cluster was null. It is required");
- }
- StartupSettings settings = new StartupSettings(id, port, new GossipSettings(gossipInterval,
- cleanupInterval), cluster);
-
- // Now iterate over the members from the config file and add them to the settings.
- String configMembersDetails = "Config-members [";
- JSONArray membersJSON = jsonObject.getJSONArray("members");
- for (int i = 0; i < membersJSON.length(); i++) {
- JSONObject memberJSON = membersJSON.getJSONObject(i);
- RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("cluster"),
- memberJSON.getString("host"), memberJSON.getInt("port"), "");
- settings.addGossipMember(member);
- configMembersDetails += member.getAddress();
- if (i < (membersJSON.length() - 1))
- configMembersDetails += ", ";
- }
- log.info(configMembersDetails + "]");
-
- // Return the created settings object.
- return settings;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/event/GossipListener.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/event/GossipListener.java b/src/main/java/com/google/code/gossip/event/GossipListener.java
deleted file mode 100644
index 424984c..0000000
--- a/src/main/java/com/google/code/gossip/event/GossipListener.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip.event;
-
-import com.google.code.gossip.GossipMember;
-
-public interface GossipListener {
- void gossipEvent(GossipMember member, GossipState state);
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/event/GossipState.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/event/GossipState.java b/src/main/java/com/google/code/gossip/event/GossipState.java
deleted file mode 100644
index e303c89..0000000
--- a/src/main/java/com/google/code/gossip/event/GossipState.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip.event;
-
-public enum GossipState {
- UP("up"), DOWN("down");
- @SuppressWarnings("unused")
- private final String state;
-
- private GossipState(String state) {
- this.state = state;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/examples/GossipExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/examples/GossipExample.java b/src/main/java/com/google/code/gossip/examples/GossipExample.java
deleted file mode 100644
index b82bb40..0000000
--- a/src/main/java/com/google/code/gossip/examples/GossipExample.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip.examples;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.code.gossip.GossipMember;
-import com.google.code.gossip.GossipService;
-import com.google.code.gossip.GossipSettings;
-import com.google.code.gossip.RemoteGossipMember;
-
-/**
- * This class is an example of how one could use the gossip service. Here we start multiple gossip
- * clients on this host as specified in the config file.
- *
- * @author harmenw
- */
-public class GossipExample extends Thread {
- /** The number of clients to start. */
- private static final int NUMBER_OF_CLIENTS = 4;
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- new GossipExample();
- }
-
- /**
- * Constructor. This will start the this thread.
- */
- public GossipExample() {
- start();
- }
-
- /**
- * @see java.lang.Thread#run()
- */
- public void run() {
- try {
- GossipSettings settings = new GossipSettings();
-
- List<GossipService> clients = new ArrayList<>();
-
- // Get my ip address.
- String myIpAddress = InetAddress.getLocalHost().getHostAddress();
-
- String cluster = "My Gossip Cluster";
-
- // Create the gossip members and put them in a list and give them a port number starting with
- // 2000.
- List<GossipMember> startupMembers = new ArrayList<>();
- for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
- startupMembers.add(new RemoteGossipMember(cluster, myIpAddress, 2000 + i, ""));
- }
-
- // Lets start the gossip clients.
- // Start the clients, waiting cleaning-interval + 1 second between them which will show the
- // dead list handling.
- for (GossipMember member : startupMembers) {
- GossipService gossipService = new GossipService(cluster, myIpAddress, member.getPort(), "",
- startupMembers, settings, null);
- clients.add(gossipService);
- gossipService.start();
- sleep(settings.getCleanupInterval() + 1000);
- }
-
- // After starting all gossip clients, first wait 10 seconds and then shut them down.
- sleep(10000);
- System.err.println("Going to shutdown all services...");
- // Since they all run in the same virtual machine and share the same executor, if one is
- // shutdown they will all stop.
- clients.get(0).shutdown();
-
- } catch (UnknownHostException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java
deleted file mode 100644
index 9b0bd5c..0000000
--- a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip.manager;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.code.gossip.GossipService;
-import com.google.code.gossip.LocalGossipMember;
-
-/**
- * [The active thread: periodically send gossip request.] The class handles gossiping the membership
- * list. This information is important to maintaining a common state among all the nodes, and is
- * important for detecting failures.
- */
-abstract public class ActiveGossipThread implements Runnable {
-
- protected final GossipManager gossipManager;
-
- private final AtomicBoolean keepRunning;
-
- public ActiveGossipThread(GossipManager gossipManager) {
- this.gossipManager = gossipManager;
- this.keepRunning = new AtomicBoolean(true);
- }
-
- @Override
- public void run() {
- while (keepRunning.get()) {
- try {
- TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval());
- sendMembershipList(gossipManager.getMyself(), gossipManager.getMemberList());
- } catch (InterruptedException e) {
- GossipService.LOGGER.error(e);
- keepRunning.set(false);
- }
- }
- shutdown();
- }
-
- public void shutdown() {
- keepRunning.set(false);
- }
-
- /**
- * Performs the sending of the membership list, after we have incremented our own heartbeat.
- */
- abstract protected void sendMembershipList(LocalGossipMember me,
- List<LocalGossipMember> memberList);
-
- /**
- * Abstract method which should be implemented by a subclass. This method should return a member
- * of the list to gossip with.
- *
- * @param memberList
- * The list of members which are stored in the local list of members.
- * @return The chosen LocalGossipMember to gossip with.
- */
- abstract protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList);
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java
deleted file mode 100644
index 42354b6..0000000
--- a/src/main/java/com/google/code/gossip/manager/GossipManager.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip.manager;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.management.Notification;
-import javax.management.NotificationListener;
-
-import org.apache.log4j.Logger;
-
-import com.google.code.gossip.GossipMember;
-import com.google.code.gossip.GossipService;
-import com.google.code.gossip.GossipSettings;
-import com.google.code.gossip.LocalGossipMember;
-import com.google.code.gossip.event.GossipListener;
-import com.google.code.gossip.event.GossipState;
-
-public abstract class GossipManager extends Thread implements NotificationListener {
-
- public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
-
- public static final int MAX_PACKET_SIZE = 102400;
-
- private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members;
-
- private final LocalGossipMember me;
-
- private final GossipSettings settings;
-
- private final AtomicBoolean gossipServiceRunning;
-
- private final Class<? extends PassiveGossipThread> passiveGossipThreadClass;
-
- private final Class<? extends ActiveGossipThread> activeGossipThreadClass;
-
- private final GossipListener listener;
-
- private ActiveGossipThread activeGossipThread;
-
- private PassiveGossipThread passiveGossipThread;
-
- private ExecutorService gossipThreadExecutor;
-
- public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
- Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster,
- String address, int port, String id, GossipSettings settings,
- List<GossipMember> gossipMembers, GossipListener listener) {
- this.passiveGossipThreadClass = passiveGossipThreadClass;
- this.activeGossipThreadClass = activeGossipThreadClass;
- this.settings = settings;
- me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this,
- settings.getCleanupInterval());
- members = new ConcurrentSkipListMap<>();
- for (GossipMember startupMember : gossipMembers) {
- if (!startupMember.equals(me)) {
- LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
- startupMember.getHost(), startupMember.getPort(), startupMember.getId(),
- System.currentTimeMillis(), this, settings.getCleanupInterval());
- members.put(member, GossipState.UP);
- GossipService.LOGGER.debug(member);
- }
- }
- gossipThreadExecutor = Executors.newCachedThreadPool();
- gossipServiceRunning = new AtomicBoolean(true);
- this.listener = listener;
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- public void run() {
- GossipService.LOGGER.debug("Service has been shutdown...");
- }
- }));
- }
-
- /**
- * All timers associated with a member will trigger this method when it goes off. The timer will
- * go off if we have not heard from this member in <code> _settings.T_CLEANUP </code> time.
- */
- @Override
- public void handleNotification(Notification notification, Object handback) {
- LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
- GossipService.LOGGER.debug("Dead member detected: " + deadMember);
- members.put(deadMember, GossipState.DOWN);
- if (listener != null) {
- listener.gossipEvent(deadMember, GossipState.DOWN);
- }
- }
-
- public void revivieMember(LocalGossipMember m) {
- for (Entry<LocalGossipMember, GossipState> it : this.members.entrySet()) {
- if (it.getKey().getId().equals(m.getId())) {
- it.getKey().disableTimer();
- }
- }
- members.remove(m);
- members.put(m, GossipState.UP);
- if (listener != null) {
- listener.gossipEvent(m, GossipState.UP);
- }
- }
-
- public void createOrRevivieMember(LocalGossipMember m) {
- members.put(m, GossipState.UP);
- if (listener != null) {
- listener.gossipEvent(m, GossipState.UP);
- }
- }
-
- public GossipSettings getSettings() {
- return settings;
- }
-
- /**
- *
- * @return a read only list of members found in the UP state
- */
- public List<LocalGossipMember> getMemberList() {
- List<LocalGossipMember> up = new ArrayList<>();
- for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
- if (GossipState.UP.equals(entry.getValue())) {
- up.add(entry.getKey());
- }
- }
- return Collections.unmodifiableList(up);
- }
-
- public LocalGossipMember getMyself() {
- return me;
- }
-
- public List<LocalGossipMember> getDeadList() {
- List<LocalGossipMember> up = new ArrayList<>();
- for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
- if (GossipState.DOWN.equals(entry.getValue())) {
- up.add(entry.getKey());
- }
- }
- return Collections.unmodifiableList(up);
- }
-
- /**
- * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
- * thread and start the receiver thread.
- */
- public void run() {
- for (LocalGossipMember member : members.keySet()) {
- if (member != me) {
- member.startTimeoutTimer();
- }
- }
- try {
- passiveGossipThread = passiveGossipThreadClass.getConstructor(GossipManager.class)
- .newInstance(this);
- gossipThreadExecutor.execute(passiveGossipThread);
- activeGossipThread = activeGossipThreadClass.getConstructor(GossipManager.class)
- .newInstance(this);
- gossipThreadExecutor.execute(activeGossipThread);
- } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
- | InvocationTargetException | NoSuchMethodException | SecurityException e1) {
- throw new RuntimeException(e1);
- }
- GossipService.LOGGER.debug("The GossipService is started.");
- while (gossipServiceRunning.get()) {
- try {
- // TODO
- TimeUnit.MILLISECONDS.sleep(1);
- } catch (InterruptedException e) {
- GossipService.LOGGER.warn("The GossipClient was interrupted.");
- }
- }
- }
-
- /**
- * Shutdown the gossip service.
- */
- public void shutdown() {
- gossipServiceRunning.set(false);
- gossipThreadExecutor.shutdown();
- if (passiveGossipThread != null) {
- passiveGossipThread.shutdown();
- }
- if (activeGossipThread != null) {
- activeGossipThread.shutdown();
- }
- try {
- boolean result = gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
- if (!result) {
- LOGGER.error("executor shutdown timed out");
- }
- } catch (InterruptedException e) {
- LOGGER.error(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
deleted file mode 100644
index 6bf1494..0000000
--- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip.manager;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import com.google.code.gossip.GossipMember;
-import com.google.code.gossip.GossipService;
-import com.google.code.gossip.RemoteGossipMember;
-import com.google.code.gossip.model.ActiveGossipMessage;
-
-/**
- * [The passive thread: reply to incoming gossip request.] This class handles the passive cycle,
- * where this client has received an incoming message. For now, this message is always the
- * membership list, but if you choose to gossip additional information, you will need some logic to
- * determine the incoming message.
- */
-abstract public class PassiveGossipThread implements Runnable {
-
- public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
-
- /** The socket used for the passive thread of the gossip service. */
- private final DatagramSocket server;
-
- private final GossipManager gossipManager;
-
- private final AtomicBoolean keepRunning;
-
- private final String cluster;
-
- private final ObjectMapper MAPPER = new ObjectMapper();
-
- public PassiveGossipThread(GossipManager gossipManager) {
- this.gossipManager = gossipManager;
- try {
- SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getHost(),
- gossipManager.getMyself().getPort());
- server = new DatagramSocket(socketAddress);
- GossipService.LOGGER.debug("Gossip service successfully initialized on port "
- + gossipManager.getMyself().getPort());
- GossipService.LOGGER.debug("I am " + gossipManager.getMyself());
- cluster = gossipManager.getMyself().getClusterName();
- if (cluster == null){
- throw new IllegalArgumentException("cluster was null");
- }
- } catch (SocketException ex) {
- GossipService.LOGGER.warn(ex);
- throw new RuntimeException(ex);
- }
- keepRunning = new AtomicBoolean(true);
- }
-
- @Override
- public void run() {
- while (keepRunning.get()) {
- try {
- byte[] buf = new byte[server.getReceiveBufferSize()];
- DatagramPacket p = new DatagramPacket(buf, buf.length);
- server.receive(p);
- int packet_length = 0;
- for (int i = 0; i < 4; i++) {
- int shift = (4 - 1 - i) * 8;
- packet_length += (buf[i] & 0x000000FF) << shift;
- }
- if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
- byte[] json_bytes = new byte[packet_length];
- for (int i = 0; i < packet_length; i++) {
- json_bytes[i] = buf[i + 4];
- }
- if (GossipService.LOGGER.isDebugEnabled()){
- String receivedMessage = new String(json_bytes);
- GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): "
- + receivedMessage);
- }
- try {
- List<GossipMember> remoteGossipMembers = new ArrayList<>();
- RemoteGossipMember senderMember = null;
- ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes,
- ActiveGossipMessage.class);
- for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
- RemoteGossipMember member = new RemoteGossipMember(
- activeGossipMessage.getMembers().get(i).getCluster(),
- activeGossipMessage.getMembers().get(i).getHost(),
- activeGossipMessage.getMembers().get(i).getPort(),
- activeGossipMessage.getMembers().get(i).getId(),
- activeGossipMessage.getMembers().get(i).getHeartbeat());
- if (!(member.getClusterName().equals(cluster))){
- GossipService.LOGGER.warn("Note a member of this cluster " + i);
- continue;
- }
- // This is the first member found, so this should be the member who is communicating
- // with me.
- if (i == 0) {
- senderMember = member;
- }
- remoteGossipMembers.add(member);
- }
- mergeLists(gossipManager, senderMember, remoteGossipMembers);
- } catch (RuntimeException ex) {
- GossipService.LOGGER.error("Unable to process message", ex);
- }
- } else {
- GossipService.LOGGER
- .error("The received message is not of the expected size, it has been dropped.");
- }
-
- } catch (IOException e) {
- GossipService.LOGGER.error(e);
- System.out.println(e);
- keepRunning.set(false);
- }
- }
- shutdown();
- }
-
- public void shutdown() {
- try {
- server.close();
- } catch (RuntimeException ex) {
- }
- }
-
- /**
- * Abstract method for merging the local and remote list.
- *
- * @param gossipManager
- * The GossipManager for retrieving the local members and dead members list.
- * @param senderMember
- * The member who is sending this list, this could be used to send a response if the
- * remote list contains out-dated information.
- * @param remoteList
- * The list of members known at the remote side.
- */
- abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
- List<GossipMember> remoteList);
-}
-
-/*
- * random comments // Check whether the package is smaller than the maximal packet length. // A
- * package larger than this would not be possible to be send from a GossipService, // since this is
- * check before sending the message. // This could normally only occur when the list of members is
- * very big, // or when the packet is malformed, and the first 4 bytes is not the right in anymore.
- * // For this reason we regards the message.
- */
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
deleted file mode 100644
index 08d573a..0000000
--- a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip.manager.impl;
-
-import java.util.List;
-
-import com.google.code.gossip.GossipMember;
-import com.google.code.gossip.GossipService;
-import com.google.code.gossip.LocalGossipMember;
-import com.google.code.gossip.RemoteGossipMember;
-import com.google.code.gossip.manager.GossipManager;
-import com.google.code.gossip.manager.PassiveGossipThread;
-
-public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread {
-
- public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
- super(gossipManager);
- }
-
- /**
- * Merge remote list (received from peer), and our local member list. Simply, we must update the
- * heartbeats that the remote list has with our list. Also, some additional logic is needed to
- * make sure we have not timed out a member and then immediately received a list with that member.
- *
- * @param gossipManager
- * @param senderMember
- * @param remoteList
- */
- protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
- List<GossipMember> remoteList) {
-
- // if the person sending to us is in the dead list consider them up
- for (LocalGossipMember i : gossipManager.getDeadList()) {
- if (i.getId().equals(senderMember.getId())) {
- System.out.println(gossipManager.getMyself() + " caught a live one!");
- LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
- senderMember.getHost(), senderMember.getPort(), senderMember.getId(),
- senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
- .getCleanupInterval());
- gossipManager.revivieMember(newLocalMember);
- newLocalMember.startTimeoutTimer();
- }
- }
- for (GossipMember remoteMember : remoteList) {
- if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
- continue;
- }
- if (gossipManager.getMemberList().contains(remoteMember)) {
- LocalGossipMember localMember = gossipManager.getMemberList().get(
- gossipManager.getMemberList().indexOf(remoteMember));
- if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
- localMember.setHeartbeat(remoteMember.getHeartbeat());
- localMember.resetTimeoutTimer();
- }
- } else if (!gossipManager.getMemberList().contains(remoteMember)
- && !gossipManager.getDeadList().contains(remoteMember)) {
- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
- remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
- remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
- .getCleanupInterval());
- gossipManager.createOrRevivieMember(newLocalMember);
- newLocalMember.startTimeoutTimer();
- } else {
- if (gossipManager.getDeadList().contains(remoteMember)) {
- LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
- gossipManager.getDeadList().indexOf(remoteMember));
- if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
- LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
- remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
- remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
- .getCleanupInterval());
- gossipManager.revivieMember(newLocalMember);
- newLocalMember.startTimeoutTimer();
- GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress()
- + " from dead list and added to local member list.");
- } else {
- GossipService.LOGGER.debug("me " + gossipManager.getMyself());
- GossipService.LOGGER.debug("sender " + senderMember);
- GossipService.LOGGER.debug("remote " + remoteList);
- GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
- GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
- }
- } else {
- GossipService.LOGGER.debug("me " + gossipManager.getMyself());
- GossipService.LOGGER.debug("sender " + senderMember);
- GossipService.LOGGER.debug("remote " + remoteList);
- GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
- GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
- // throw new IllegalArgumentException("wtf");
- }
- }
- }
- }
-
-}
-
-/**
- * old comment section: // If a member is restarted the heartbeat will restart from 1, so we should
- * check // that here. // So a member can become from the dead when it is either larger than a
- * previous // heartbeat (due to network failure) // or when the heartbeat is 1 (after a restart of
- * the service). // TODO: What if the first message of a gossip service is sent to a dead node? The
- * // second member will receive a heartbeat of two. // TODO: The above does happen. Maybe a special
- * message for a revived member? // TODO: Or maybe when a member is declared dead for more than //
- * _settings.getCleanupInterval() ms, reset the heartbeat to 0. // It will then accept a revived
- * member. // The above is now handle by checking whether the heartbeat differs //
- * _settings.getCleanupInterval(), it must be restarted.
- */
-
-/*
- * // The remote member is back from the dead. // Remove it from the dead list. //
- * gossipManager.getDeadList().remove(localDeadMember); // Add it as a new member and add it to the
- * member list.
- */
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
deleted file mode 100644
index 2259781..0000000
--- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip.manager.impl;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.google.code.gossip.GossipService;
-import com.google.code.gossip.LocalGossipMember;
-import com.google.code.gossip.manager.ActiveGossipThread;
-import com.google.code.gossip.manager.GossipManager;
-import com.google.code.gossip.model.ActiveGossipMessage;
-import com.google.code.gossip.model.GossipMember;
-
-abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
-
- protected ObjectMapper om = new ObjectMapper();
-
- public SendMembersActiveGossipThread(GossipManager gossipManager) {
- super(gossipManager);
- }
-
- private GossipMember convert(LocalGossipMember member){
- GossipMember gm = new GossipMember();
- gm.setCluster(member.getClusterName());
- gm.setHeartbeat(member.getHeartbeat());
- gm.setHost(member.getHost());
- gm.setId(member.getId());
- gm.setPort(member.getPort());
- return gm;
- }
-
- /**
- * Performs the sending of the membership list, after we have incremented our own heartbeat.
- */
- protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
- GossipService.LOGGER.debug("Send sendMembershipList() is called.");
- me.setHeartbeat(System.currentTimeMillis());
- LocalGossipMember member = selectPartner(memberList);
- if (member == null) {
- return;
- }
- try (DatagramSocket socket = new DatagramSocket()) {
- socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
- InetAddress dest = InetAddress.getByName(member.getHost());
- ActiveGossipMessage message = new ActiveGossipMessage();
- message.getMembers().add(convert(me));
- for (LocalGossipMember other : memberList) {
- message.getMembers().add(convert(other));
- }
- byte[] json_bytes = om.writeValueAsString(message).getBytes();
- int packet_length = json_bytes.length;
- if (packet_length < GossipManager.MAX_PACKET_SIZE) {
- byte[] buf = createBuffer(packet_length, json_bytes);
- DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort());
- socket.send(datagramPacket);
- } else {
- GossipService.LOGGER.error("The length of the to be send message is too large ("
- + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
- }
- } catch (IOException e1) {
- GossipService.LOGGER.warn(e1);
- }
- }
-
- private byte[] createBuffer(int packetLength, byte[] jsonBytes) {
- byte[] lengthBytes = new byte[4];
- lengthBytes[0] = (byte) (packetLength >> 24);
- lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
- lengthBytes[2] = (byte) ((packetLength << 16) >> 24);
- lengthBytes[3] = (byte) ((packetLength << 24) >> 24);
- ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length);
- byteBuffer.put(lengthBytes);
- byteBuffer.put(jsonBytes);
- byte[] buf = byteBuffer.array();
- return buf;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java
deleted file mode 100644
index 914f5ca..0000000
--- a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip.manager.random;
-
-import java.util.List;
-import java.util.Random;
-
-import com.google.code.gossip.GossipService;
-import com.google.code.gossip.LocalGossipMember;
-import com.google.code.gossip.manager.GossipManager;
-import com.google.code.gossip.manager.impl.SendMembersActiveGossipThread;
-
-public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
-
- /** The Random used for choosing a member to gossip with. */
- private final Random random;
-
- public RandomActiveGossipThread(GossipManager gossipManager) {
- super(gossipManager);
- random = new Random();
- }
-
- /**
- * [The selectToSend() function.] Find a random peer from the local membership list. In the case
- * where this client is the only member in the list, this method will return null.
- *
- * @return Member random member if list is greater than 1, null otherwise
- */
- protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
- LocalGossipMember member = null;
- if (memberList.size() > 0) {
- int randomNeighborIndex = random.nextInt(memberList.size());
- member = memberList.get(randomNeighborIndex);
- } else {
- GossipService.LOGGER.debug("I am alone in this world.");
- }
- return member;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
deleted file mode 100644
index c1e69d6..0000000
--- a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.code.gossip.manager.random;
-
-import com.google.code.gossip.GossipMember;
-import com.google.code.gossip.GossipSettings;
-import com.google.code.gossip.event.GossipListener;
-import com.google.code.gossip.manager.GossipManager;
-import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
-
-import java.util.List;
-
-public class RandomGossipManager extends GossipManager {
- public RandomGossipManager(String cluster, String address, int port, String id,
- GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) {
- super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster,
- address, port, id, settings, gossipMembers, listener);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java b/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java
deleted file mode 100644
index d3516f5..0000000
--- a/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package com.google.code.gossip.model;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ActiveGossipMessage {
-
- private List<GossipMember> members = new ArrayList<>();
-
- public ActiveGossipMessage(){
-
- }
-
- public List<GossipMember> getMembers() {
- return members;
- }
-
- public void setMembers(List<GossipMember> members) {
- this.members = members;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/com/google/code/gossip/model/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/model/GossipMember.java b/src/main/java/com/google/code/gossip/model/GossipMember.java
deleted file mode 100644
index 6c073b4..0000000
--- a/src/main/java/com/google/code/gossip/model/GossipMember.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package com.google.code.gossip.model;
-
-public class GossipMember {
-
- private String cluster;
- private String host;
- private Integer port;
- private String id;
- private Long heartbeat;
-
- public GossipMember(){
-
- }
-
- public GossipMember(String cluster, String host, Integer port, String id, Long heartbeat){
- this.cluster=cluster;
- this.host= host;
- this.port = port;
- this.id = id;
-
- }
-
- public String getCluster() {
- return cluster;
- }
-
- public void setCluster(String cluster) {
- this.cluster = cluster;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public Integer getPort() {
- return port;
- }
-
- public void setPort(Integer port) {
- this.port = port;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public Long getHeartbeat() {
- return heartbeat;
- }
-
- public void setHeartbeat(Long heartbeat) {
- this.heartbeat = heartbeat;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipMember.java b/src/main/java/org/apache/gossip/GossipMember.java
new file mode 100644
index 0000000..fd44ddd
--- /dev/null
+++ b/src/main/java/org/apache/gossip/GossipMember.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A abstract class representing a gossip member.
+ *
+ * @author joshclemm, harmenw
+ */
+public abstract class GossipMember implements Comparable<GossipMember> {
+
+
+ protected final String host;
+
+ protected final int port;
+
+ protected volatile long heartbeat;
+
+ protected final String clusterName;
+
+ /**
+ * The purpose of the id field is to be able for nodes to identify themselves beyond there
+ * host/port. For example an application might generate a persistent id so if they rejoin the
+ * cluster at a different host and port we are aware it is the same node.
+ */
+ protected String id;
+
+ /**
+ * Constructor.
+ *
+ * @param host
+ * The hostname or IP address.
+ * @param port
+ * The port number.
+ * @param heartbeat
+ * The current heartbeat.
+ * @param id
+ * an id that may be replaced after contact
+ */
+ public GossipMember(String clusterName, String host, int port, String id, long heartbeat) {
+ this.clusterName = clusterName;
+ this.host = host;
+ this.port = port;
+ this.id = id;
+ this.heartbeat = heartbeat;
+ }
+
+ /**
+ * Get the name of the cluster the member belongs to.
+ *
+ * @return The cluster name
+ */
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ /**
+ * Get the hostname or IP address of the remote gossip member.
+ *
+ * @return The hostname or IP address.
+ */
+ public String getHost() {
+ return host;
+ }
+
+ /**
+ * Get the port number of the remote gossip member.
+ *
+ * @return The port number.
+ */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * The member address in the form IP/host:port Similar to the toString in
+ * {@link InetSocketAddress}
+ */
+ public String getAddress() {
+ return host + ":" + port;
+ }
+
+ /**
+ * Get the heartbeat of this gossip member.
+ *
+ * @return The current heartbeat.
+ */
+ public long getHeartbeat() {
+ return heartbeat;
+ }
+
+ /**
+ * Set the heartbeat of this gossip member.
+ *
+ * @param heartbeat
+ * The new heartbeat.
+ */
+ public void setHeartbeat(long heartbeat) {
+ this.heartbeat = heartbeat;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String _id) {
+ this.id = _id;
+ }
+
+ public String toString() {
+ return "Member [address=" + getAddress() + ", id=" + id + ", heartbeat=" + heartbeat + "]";
+ }
+
+ /**
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ String address = getAddress();
+ result = prime * result + ((address == null) ? 0 : address.hashCode()) + clusterName == null ? 0
+ : clusterName.hashCode();
+ return result;
+ }
+
+ /**
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ System.err.println("equals(): obj is null.");
+ return false;
+ }
+ if (!(obj instanceof GossipMember)) {
+ System.err.println("equals(): obj is not of type GossipMember.");
+ return false;
+ }
+ // The object is the same of they both have the same address (hostname and port).
+ return getAddress().equals(((LocalGossipMember) obj).getAddress())
+ && getClusterName().equals(((LocalGossipMember) obj).getClusterName());
+ }
+
+ public int compareTo(GossipMember other) {
+ return this.getAddress().compareTo(other.getAddress());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/GossipRunner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipRunner.java b/src/main/java/org/apache/gossip/GossipRunner.java
new file mode 100644
index 0000000..d995cce
--- /dev/null
+++ b/src/main/java/org/apache/gossip/GossipRunner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.json.JSONException;
+
+public class GossipRunner {
+
+ public static void main(String[] args) {
+ File configFile;
+ if (args.length == 1) {
+ configFile = new File("./" + args[0]);
+ } else {
+ configFile = new File("gossip.conf");
+ }
+ new GossipRunner(configFile);
+ }
+
+ public GossipRunner(File configFile) {
+ if (configFile != null && configFile.exists()) {
+ try {
+ System.out.println("Parsing the configuration file...");
+ StartupSettings _settings = StartupSettings.fromJSONFile(configFile);
+ GossipService gossipService = new GossipService(_settings);
+ System.out.println("Gossip service successfully initialized, let's start it...");
+ gossipService.start();
+ } catch (FileNotFoundException e) {
+ System.err.println("The given file is not found!");
+ } catch (JSONException e) {
+ System.err.println("The given file is not in the correct JSON format!");
+ } catch (IOException e) {
+ System.err.println("Could not read the configuration file: " + e.getMessage());
+ } catch (InterruptedException e) {
+ System.err.println("Error while starting the gossip service: " + e.getMessage());
+ }
+ } else {
+ System.out
+ .println("The gossip.conf file is not found.\n\nEither specify the path to the startup settings file or place the gossip.json file in the same folder as the JAR file.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/968203e8/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
new file mode 100644
index 0000000..9db740e
--- /dev/null
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.random.RandomGossipManager;
+import org.apache.log4j.Logger;
+
+/**
+ * This object represents the service which is responsible for gossiping with other gossip members.
+ *
+ * @author joshclemm, harmenw
+ */
+public class GossipService {
+
+ public static final Logger LOGGER = Logger.getLogger(GossipService.class);
+
+ private GossipManager gossipManager;
+
+ /**
+ * Constructor with the default settings.
+ *
+ * @throws InterruptedException
+ * @throws UnknownHostException
+ */
+ public GossipService(StartupSettings startupSettings) throws InterruptedException,
+ UnknownHostException {
+ this(startupSettings.getCluster(), InetAddress.getLocalHost().getHostAddress(), startupSettings
+ .getPort(), startupSettings.getId(), startupSettings.getGossipMembers(),
+ startupSettings.getGossipSettings(), null);
+ }
+
+ /**
+ * Setup the client's lists, gossiping parameters, and parse the startup config file.
+ *
+ * @throws InterruptedException
+ * @throws UnknownHostException
+ */
+ public GossipService(String cluster, String ipAddress, int port, String id,
+ List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
+ throws InterruptedException, UnknownHostException {
+ gossipManager = new RandomGossipManager(cluster, ipAddress, port, id, settings, gossipMembers,
+ listener);
+ }
+
+ public void start() {
+ String address = get_gossipManager().getMyself().getHost() + ":"
+ + get_gossipManager().getMyself().getPort();
+ LOGGER.debug("Starting: " + gossipManager.getName() + " - " + address);
+
+ gossipManager.start();
+ }
+
+ public void shutdown() {
+ gossipManager.shutdown();
+ }
+
+ public GossipManager get_gossipManager() {
+ return gossipManager;
+ }
+
+ public void set_gossipManager(GossipManager _gossipManager) {
+ this.gossipManager = _gossipManager;
+ }
+
+}
[02/12] incubator-gossip git commit: Remove dead code
Posted by ec...@apache.org.
Remove dead code
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/8e8db1c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/8e8db1c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/8e8db1c8
Branch: refs/heads/master
Commit: 8e8db1c837661d3ee7314184518ecd71fcbc71aa
Parents: 3366f63
Author: Edward Capriolo <ed...@gmail.com>
Authored: Mon May 16 19:37:17 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Tue Jun 7 22:52:53 2016 -0400
----------------------------------------------------------------------
.../com/google/code/gossip/GossipMember.java | 30 +-------------------
1 file changed, 1 insertion(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/8e8db1c8/src/main/java/com/google/code/gossip/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java
index 56a5f44..314b5b7 100644
--- a/src/main/java/com/google/code/gossip/GossipMember.java
+++ b/src/main/java/com/google/code/gossip/GossipMember.java
@@ -29,16 +29,7 @@ import org.json.JSONObject;
*/
public abstract class GossipMember implements Comparable<GossipMember> {
- public static final String JSON_HOST = "host";
-
- public static final String JSON_PORT = "port";
-
- public static final String JSON_HEARTBEAT = "heartbeat";
-
- public static final String JSON_ID = "id";
-
- public static final String JSON_CLUSTER = "cluster";
-
+
protected final String host;
protected final int port;
@@ -174,25 +165,6 @@ public abstract class GossipMember implements Comparable<GossipMember> {
&& getClusterName().equals(((LocalGossipMember) obj).getClusterName());
}
- /**
- * Get the JSONObject which is the JSON representation of this GossipMember.
- *
- * @return The JSONObject of this GossipMember.
- */
- public JSONObject toJSONObject() {
- try {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(JSON_CLUSTER, clusterName);
- jsonObject.put(JSON_HOST, host);
- jsonObject.put(JSON_PORT, port);
- jsonObject.put(JSON_ID, id);
- jsonObject.put(JSON_HEARTBEAT, heartbeat);
- return jsonObject;
- } catch (JSONException e) {
- throw new RuntimeException(e);
- }
- }
-
public int compareTo(GossipMember other) {
return this.getAddress().compareTo(other.getAddress());
}
[07/12] incubator-gossip git commit: GOSSIP-2 rebase
Posted by ec...@apache.org.
GOSSIP-2 rebase
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/900bfda9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/900bfda9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/900bfda9
Branch: refs/heads/master
Commit: 900bfda9555ee7d295cb722e01ad9b9c380b4b39
Parents: ddc9a67
Author: Edward Capriolo <ed...@gmail.com>
Authored: Tue Jun 7 22:52:20 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Tue Jun 7 22:52:54 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/gossip/GossipService.java | 12 +++
.../manager/random/RandomGossipManager.java | 78 ++++++++++++++++
.../manager/RandomGossipManagerBuilderTest.java | 96 ++++++++++++++++++++
3 files changed, 186 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/900bfda9/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index 3175706..d07c070 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -60,8 +60,20 @@ public class GossipService {
public GossipService(String cluster, URI uri, String id,
List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
throws InterruptedException, UnknownHostException {
+<<<<<<< HEAD
gossipManager = new RandomGossipManager(cluster, uri, id, settings, gossipMembers,
listener);
+=======
+ gossipManager = RandomGossipManager.newBuilder()
+ .withId(id)
+ .cluster(cluster)
+ .address(ipAddress)
+ .port(port)
+ .settings(settings)
+ .gossipMembers(gossipMembers)
+ .listener(listener)
+ .build();
+>>>>>>> fe196cd... GOSSIP-4: Use builder to create RandomGossipManager (Jaideep Dhok via EGC)
}
public void start() {
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/900bfda9/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
index 7aa4435..100b237 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -23,11 +23,89 @@ import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
+<<<<<<< HEAD
import java.net.URI;
import java.util.List;
public class RandomGossipManager extends GossipManager {
public RandomGossipManager(String cluster, URI uri, String id,
+=======
+import java.util.ArrayList;
+import java.util.List;
+
+public class RandomGossipManager extends GossipManager {
+
+ public static ManagerBuilder newBuilder() {
+ return new ManagerBuilder();
+ }
+
+ public static final class ManagerBuilder {
+ private String cluster;
+ private String address;
+ private int port;
+ private String id;
+ private GossipSettings settings;
+ private List<GossipMember> gossipMembers;
+ private GossipListener listener;
+
+ private ManagerBuilder() {}
+
+ private void checkArgument(boolean check, String msg) {
+ if (!check) {
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
+ public ManagerBuilder cluster(String cluster) {
+ this.cluster = cluster;
+ return this;
+ }
+
+ public ManagerBuilder address(String address) {
+ this.address = address;
+ return this;
+ }
+
+ public ManagerBuilder port(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public ManagerBuilder withId(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public ManagerBuilder settings(GossipSettings settings) {
+ this.settings = settings;
+ return this;
+ }
+
+ public ManagerBuilder gossipMembers(List<GossipMember> members) {
+ this.gossipMembers = members;
+ return this;
+ }
+
+ public ManagerBuilder listener(GossipListener listener) {
+ this.listener = listener;
+ return this;
+ }
+
+ public RandomGossipManager build() {
+ checkArgument(id != null, "You must specify an id");
+ checkArgument(cluster != null, "You must specify a cluster name");
+ checkArgument(settings != null, "You must specify gossip settings");
+
+ if (this.gossipMembers == null) {
+ this.gossipMembers = new ArrayList<>();
+ }
+
+ return new RandomGossipManager(cluster, address, port, id, settings, gossipMembers, listener);
+ }
+ }
+
+ private RandomGossipManager(String cluster, String address, int port, String id,
+>>>>>>> fe196cd... GOSSIP-4: Use builder to create RandomGossipManager (Jaideep Dhok via EGC)
GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster,
uri, id, settings, gossipMembers, listener);
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/900bfda9/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
new file mode 100644
index 0000000..38b8ab4
--- /dev/null
+++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import org.apache.gossip.GossipMember;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
+import org.apache.gossip.manager.random.RandomGossipManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RandomGossipManagerBuilderTest {
+
+ public static class TestGossipListener implements GossipListener {
+ @Override
+ public void gossipEvent(GossipMember member, GossipState state) {
+ System.out.println("Got gossip event");
+ }
+ }
+
+ public static class TestNotificationListener implements NotificationListener {
+ @Override
+ public void handleNotification(Notification notification, Object o) {
+ System.out.println("Got notification event");
+ }
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void idShouldNotBeNull() {
+ RandomGossipManager.newBuilder().cluster("aCluster").build();
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void clusterShouldNotBeNull() {
+ RandomGossipManager.newBuilder().withId("id").build();
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void settingsShouldNotBeNull() {
+ RandomGossipManager.newBuilder().withId("id").cluster("aCluster").build();
+ }
+
+ @Test
+ public void createMembersListIfNull() {
+ RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
+ .withId("id")
+ .cluster("aCluster")
+ .port(8080)
+ .address("localhost")
+ .settings(new GossipSettings())
+ .gossipMembers(null).build();
+
+ Assert.assertNotNull(gossipManager.getMemberList());
+ }
+
+ @Test
+ public void useMemberListIfProvided() {
+ LocalGossipMember member = new LocalGossipMember("aCluster", "localhost", 2000, "aGossipMember",
+ System.currentTimeMillis(), new TestNotificationListener(), 60000);
+
+ List<GossipMember> memberList = new ArrayList<>();
+ memberList.add(member);
+
+ RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
+ .withId("id")
+ .cluster("aCluster")
+ .settings(new GossipSettings())
+ .gossipMembers(memberList).build();
+
+ Assert.assertEquals(1, gossipManager.getMemberList().size());
+ Assert.assertEquals(member.getId(), gossipManager.getMemberList().get(0).getId());
+ }
+
+}
\ No newline at end of file
[05/12] incubator-gossip git commit: Final final
Posted by ec...@apache.org.
Final final
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/3366f636
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/3366f636
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/3366f636
Branch: refs/heads/master
Commit: 3366f6364142ad8460c7899fff5a90b67b81976d
Parents: 266e039
Author: Edward Capriolo <ed...@gmail.com>
Authored: Mon May 16 19:10:53 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Tue Jun 7 22:52:53 2016 -0400
----------------------------------------------------------------------
.../google/code/gossip/manager/PassiveGossipThread.java | 10 +++-------
1 file changed, 3 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/3366f636/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
index ec11cfe..6bf1494 100644
--- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java
@@ -29,10 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipService;
import com.google.code.gossip.RemoteGossipMember;
@@ -49,15 +45,15 @@ abstract public class PassiveGossipThread implements Runnable {
public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
/** The socket used for the passive thread of the gossip service. */
- private DatagramSocket server;
+ private final DatagramSocket server;
private final GossipManager gossipManager;
- private AtomicBoolean keepRunning;
+ private final AtomicBoolean keepRunning;
private final String cluster;
- private ObjectMapper MAPPER = new ObjectMapper();
+ private final ObjectMapper MAPPER = new ObjectMapper();
public PassiveGossipThread(GossipManager gossipManager) {
this.gossipManager = gossipManager;