You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gossip.apache.org by ec...@apache.org on 2017/05/06 15:51:42 UTC
[1/2] incubator-gossip git commit: GOSSIP-85 Factor out
PassiveGossipThread
Repository: incubator-gossip
Updated Branches:
refs/heads/master e3010c854 -> c62ebaf9b
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java
----------------------------------------------------------------------
diff --git a/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java
index 3f509a6..d6aaa15 100644
--- a/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java
+++ b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java
@@ -19,6 +19,7 @@ package org.apache.gossip.transport.udp;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
import org.apache.gossip.transport.AbstractTransportManager;
import org.apache.log4j.Logger;
@@ -30,12 +31,13 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class is constructed by reflection in GossipManager.
* It manages transport (byte read/write) operations over UDP.
*/
-public class UdpTransportManager extends AbstractTransportManager {
+public class UdpTransportManager extends AbstractTransportManager implements Runnable {
public static final Logger LOGGER = Logger.getLogger(UdpTransportManager.class);
@@ -44,12 +46,14 @@ public class UdpTransportManager extends AbstractTransportManager {
private final int soTimeout;
+ private final Thread me;
+
+ private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+
/** required for reflection to work! */
public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
super(gossipManager, gossipCore);
-
soTimeout = gossipManager.getSettings().getGossipInterval() * 2;
-
try {
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
gossipManager.getMyself().getUri().getPort());
@@ -58,12 +62,38 @@ public class UdpTransportManager extends AbstractTransportManager {
LOGGER.warn(ex);
throw new RuntimeException(ex);
}
+ me = new Thread(this);
}
@Override
+ public void run() {
+ while (keepRunning.get()) {
+ try {
+ byte[] buf = read();
+ try {
+ Base message = gossipManager.getProtocolManager().read(buf);
+ gossipCore.receive(message);
+ //TODO this is suspect
+ gossipManager.getMemberStateRefresher().run();
+ } catch (RuntimeException ex) {//TODO trap json exception
+ LOGGER.error("Unable to process message", ex);
+ }
+ } catch (IOException e) {
+ // InterruptedException are completely normal here because of the blocking lifecycle.
+ if (!(e.getCause() instanceof InterruptedException)) {
+ LOGGER.error(e);
+ }
+ keepRunning.set(false);
+ }
+ }
+ }
+
+ @Override
public void shutdown() {
+ keepRunning.set(false);
server.close();
super.shutdown();
+ me.interrupt();
}
/**
@@ -81,13 +111,13 @@ public class UdpTransportManager extends AbstractTransportManager {
@Override
public void send(URI endpoint, byte[] buf) throws IOException {
- DatagramSocket socket = new DatagramSocket();
- socket.setSoTimeout(soTimeout);
- InetAddress dest = InetAddress.getByName(endpoint.getHost());
- DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort());
- socket.send(payload);
// todo: investigate UDP socket reuse. It would save a little setup/teardown time wrt to the local socket.
- socket.close();
+ try (DatagramSocket socket = new DatagramSocket()){
+ socket.setSoTimeout(soTimeout);
+ InetAddress dest = InetAddress.getByName(endpoint.getHost());
+ DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort());
+ socket.send(payload);
+ }
}
private void debug(byte[] jsonBytes) {
@@ -96,4 +126,10 @@ public class UdpTransportManager extends AbstractTransportManager {
LOGGER.debug("Received message ( bytes): " + receivedMessage);
}
}
+
+ @Override
+ public void startEndpoint() {
+ me.start();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java
----------------------------------------------------------------------
diff --git a/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java
index 5258374..8a27d0a 100644
--- a/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java
+++ b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java
@@ -17,14 +17,9 @@
*/
package org.apache.gossip.transport.udp;
-import org.apache.gossip.GossipSettings;
-import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
public class UdpTransportIntegrationTest {
// It's currently impossible to create a UdpTransportManager without bringing up an entire stack.
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 97aa409..1c48306 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,7 @@
<module>gossip-base</module>
<module>gossip-transport-udp</module>
<module>gossip-protocol-jackson</module>
+ <module>gossip-itest</module>
</modules>
<description>A peer to peer cluster discovery service</description>
[2/2] incubator-gossip git commit: GOSSIP-85 Factor out
PassiveGossipThread
Posted by ec...@apache.org.
GOSSIP-85 Factor out PassiveGossipThread
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/c62ebaf9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/c62ebaf9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/c62ebaf9
Branch: refs/heads/master
Commit: c62ebaf9b6054b669ee77d61497d51d1b382309d
Parents: e3010c8
Author: Edward Capriolo <ed...@gmail.com>
Authored: Sat Apr 29 19:45:16 2017 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Wed May 3 16:57:50 2017 -0400
----------------------------------------------------------------------
gossip-base/pom.xml | 2 +-
.../java/org/apache/gossip/GossipSettings.java | 3 +-
.../apache/gossip/manager/GossipManager.java | 4 +-
.../gossip/manager/PassiveGossipThread.java | 76 ------
.../gossip/manager/RingStatePersister.java | 3 -
.../transport/AbstractTransportManager.java | 16 +-
.../test/java/org/apache/gossip/DataTest.java | 240 -------------------
.../org/apache/gossip/IdAndPropertyTest.java | 93 -------
.../org/apache/gossip/ShutdownDeadtimeTest.java | 150 ------------
.../org/apache/gossip/SignedMessageTest.java | 119 ---------
.../org/apache/gossip/StartupSettingsTest.java | 91 -------
.../org/apache/gossip/TenNodeThreeSeedTest.java | 94 --------
.../manager/handlers/MessageHandlerTest.java | 2 +-
.../transport/UnitTestTransportManager.java | 3 +-
gossip-itest/pom.xml | 88 +++++++
.../test/java/org/apache/gossip/DataTest.java | 238 ++++++++++++++++++
.../org/apache/gossip/IdAndPropertyTest.java | 91 +++++++
.../org/apache/gossip/ShutdownDeadtimeTest.java | 148 ++++++++++++
.../org/apache/gossip/SignedMessageTest.java | 117 +++++++++
.../org/apache/gossip/StartupSettingsTest.java | 91 +++++++
.../org/apache/gossip/TenNodeThreeSeedTest.java | 92 +++++++
gossip-protocol-jackson/pom.xml | 6 +-
.../gossip/protocol/json/JacksonTest.java | 6 -
.../gossip/protocol/json/TestMessage.java | 1 +
gossip-transport-udp/pom.xml | 4 +-
.../transport/udp/UdpTransportManager.java | 54 ++++-
.../udp/UdpTransportIntegrationTest.java | 5 -
pom.xml | 1 +
28 files changed, 931 insertions(+), 907 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/pom.xml
----------------------------------------------------------------------
diff --git a/gossip-base/pom.xml b/gossip-base/pom.xml
index b9739f6..34f346c 100644
--- a/gossip-base/pom.xml
+++ b/gossip-base/pom.xml
@@ -75,4 +75,4 @@
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
index 2ceb453..792af85 100644
--- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
@@ -44,7 +44,7 @@ public class GossipSettings {
private String distribution = "normal";
private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
-
+
private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager";
private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager";
@@ -241,4 +241,5 @@ public class GossipSettings {
public void setProtocolManagerClass(String protocolManagerClass) {
this.protocolManagerClass = protocolManagerClass;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
index b1752cd..133a79f 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -50,7 +50,9 @@ public abstract class GossipManager {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
// this mapper is used for ring and user-data persistence only. NOT messages.
- public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {{
+ public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {
+ private static final long serialVersionUID = 1L;
+ {
enableDefaultTyping();
configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
}};
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
deleted file mode 100644
index 03a874c..0000000
--- a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-import java.io.IOException;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.gossip.model.Base;
-import org.apache.log4j.Logger;
-
-
-/**
- * This class handles the passive cycle,
- * where this client has received an incoming message.
- */
-public class PassiveGossipThread implements Runnable {
-
- public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
-
-
- private final AtomicBoolean keepRunning;
- private final GossipCore gossipCore;
- private final GossipManager gossipManager;
-
- public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
- this.gossipManager = gossipManager;
- this.gossipCore = gossipCore;
- if (gossipManager.getMyself().getClusterName() == null){
- throw new IllegalArgumentException("Cluster was null");
- }
-
- keepRunning = new AtomicBoolean(true);
- }
-
- @Override
- public void run() {
- while (keepRunning.get()) {
- try {
- byte[] buf = gossipManager.getTransportManager().read();
- try {
- Base message = gossipManager.getProtocolManager().read(buf);
- gossipCore.receive(message);
- gossipManager.getMemberStateRefresher().run();
- } catch (RuntimeException ex) {//TODO trap json exception
- LOGGER.error("Unable to process message", ex);
- }
- } catch (IOException e) {
- // InterruptedException are completely normal here because of the blocking lifecycle.
- if (!(e.getCause() instanceof InterruptedException)) {
- LOGGER.error(e);
- }
- keepRunning.set(false);
- }
- }
- }
-
- public void requestStop() {
- keepRunning.set(false);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
index 0af9f12..5334ad4 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
@@ -22,14 +22,11 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
-import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.LocalMember;
-import org.apache.gossip.crdt.CrdtModule;
import org.apache.log4j.Logger;
public class RingStatePersister implements Runnable {
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
index 33db038..82b0dfb 100644
--- a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
@@ -21,7 +21,6 @@ import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.manager.AbstractActiveGossiper;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.PassiveGossipThread;
import org.apache.gossip.utils.ReflectionUtils;
import org.apache.log4j.Logger;
@@ -36,14 +35,14 @@ public abstract class AbstractTransportManager implements TransportManager {
public static final Logger LOGGER = Logger.getLogger(AbstractTransportManager.class);
- private final PassiveGossipThread passiveGossipThread;
private final ExecutorService gossipThreadExecutor;
-
private final AbstractActiveGossiper activeGossipThread;
+ protected final GossipManager gossipManager;
+ protected final GossipCore gossipCore;
public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
-
- passiveGossipThread = new PassiveGossipThread(gossipManager, gossipCore);
+ this.gossipManager = gossipManager;
+ this.gossipCore = gossipCore;
gossipThreadExecutor = Executors.newCachedThreadPool();
activeGossipThread = ReflectionUtils.constructWithReflection(
gossipManager.getSettings().getActiveGossipClass(),
@@ -58,7 +57,6 @@ public abstract class AbstractTransportManager implements TransportManager {
// shut down threads etc.
@Override
public void shutdown() {
- passiveGossipThread.requestStop();
gossipThreadExecutor.shutdown();
if (activeGossipThread != null) {
activeGossipThread.shutdown();
@@ -77,11 +75,9 @@ public abstract class AbstractTransportManager implements TransportManager {
@Override
public void startActiveGossiper() {
- activeGossipThread.init();
+ activeGossipThread.init();
}
@Override
- public void startEndpoint() {
- gossipThreadExecutor.execute(passiveGossipThread);
- }
+ public abstract void startEndpoint();
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/DataTest.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/test/java/org/apache/gossip/DataTest.java b/gossip-base/src/test/java/org/apache/gossip/DataTest.java
deleted file mode 100644
index bb33dc2..0000000
--- a/gossip-base/src/test/java/org/apache/gossip/DataTest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.gossip.crdt.GrowOnlyCounter;
-import org.apache.gossip.crdt.GrowOnlySet;
-import org.apache.gossip.crdt.OrSet;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
-import org.apache.gossip.model.PerNodeDataMessage;
-import org.apache.gossip.model.SharedDataMessage;
-import org.junit.Test;
-
-import io.teknek.tunit.TUnit;
-
-public class DataTest extends AbstractIntegrationBase {
-
- private String orSetKey = "cror";
- private String gCounterKey = "crdtgc";
-
- @Test
- public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
- GossipSettings settings = new GossipSettings();
- settings.setPersistRingState(false);
- settings.setPersistDataState(false);
- settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
- settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
- String cluster = UUID.randomUUID().toString();
- int seedNodes = 1;
- List<Member> startupMembers = new ArrayList<>();
- for (int i = 1; i < seedNodes+1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
- startupMembers.add(new RemoteMember(cluster, uri, i + ""));
- }
- final List<GossipManager> clients = new ArrayList<>();
- final int clusterMembers = 2;
- for (int i = 1; i < clusterMembers + 1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
- GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
- .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
- clients.add(gossipService);
- gossipService.init();
- register(gossipService);
- }
- TUnit.assertThat(() -> {
- int total = 0;
- for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).getLiveMembers().size();
- }
- return total;
- }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
- clients.get(0).gossipPerNodeData(msg());
- clients.get(0).gossipSharedData(sharedMsg());
-
- TUnit.assertThat(()-> {
- PerNodeDataMessage x = clients.get(1).findPerNodeGossipData(1 + "", "a");
- if (x == null)
- return "";
- else
- return x.getPayload();
- }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
-
- TUnit.assertThat(() -> {
- SharedDataMessage x = clients.get(1).findSharedGossipData("a");
- if (x == null)
- return "";
- else
- return x.getPayload();
- }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
-
-
- givenDifferentDatumsInSet(clients);
- assertThatListIsMerged(clients);
-
- givenOrs(clients);
- assertThatOrSetIsMerged(clients);
- dropIt(clients);
- assertThatOrSetDelIsMerged(clients);
-
-
- // test g counter
- givenDifferentIncrement(clients);
- assertThatCountIsUpdated(clients, 3);
- givenIncreaseOther(clients);
- assertThatCountIsUpdated(clients, 7);
-
- for (int i = 0; i < clusterMembers; ++i) {
- clients.get(i).shutdown();
- }
- }
-
- private void givenDifferentIncrement(final List<GossipManager> clients) {
- {
- SharedDataMessage d = new SharedDataMessage();
- d.setKey(gCounterKey);
- d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L)));
- d.setExpireAt(Long.MAX_VALUE);
- d.setTimestamp(System.currentTimeMillis());
- clients.get(0).merge(d);
- }
- {
- SharedDataMessage d = new SharedDataMessage();
- d.setKey(gCounterKey);
- d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L)));
- d.setExpireAt(Long.MAX_VALUE);
- d.setTimestamp(System.currentTimeMillis());
- clients.get(1).merge(d);
- }
- }
-
- private void givenIncreaseOther(final List<GossipManager> clients) {
- GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey);
- GrowOnlyCounter gc2 = new GrowOnlyCounter(gc,
- new GrowOnlyCounter.Builder(clients.get(1)).increment(4L));
-
- SharedDataMessage d = new SharedDataMessage();
- d.setKey(gCounterKey);
- d.setPayload(gc2);
- d.setExpireAt(Long.MAX_VALUE);
- d.setTimestamp(System.currentTimeMillis());
- clients.get(1).merge(d);
- }
-
- private void givenOrs(List<GossipManager> clients) {
- {
- SharedDataMessage d = new SharedDataMessage();
- d.setKey(orSetKey);
- d.setPayload(new OrSet<String>("1", "2"));
- d.setExpireAt(Long.MAX_VALUE);
- d.setTimestamp(System.currentTimeMillis());
- clients.get(0).merge(d);
- }
- {
- SharedDataMessage d = new SharedDataMessage();
- d.setKey(orSetKey);
- d.setPayload(new OrSet<String>("3", "4"));
- d.setExpireAt(Long.MAX_VALUE);
- d.setTimestamp(System.currentTimeMillis());
- clients.get(1).merge(d);
- }
- }
-
- private void dropIt(List<GossipManager> clients) {
- @SuppressWarnings("unchecked")
- OrSet<String> o = (OrSet<String>) clients.get(0).findCrdt(orSetKey);
- OrSet<String> o2 = new OrSet<String>(o, new OrSet.Builder<String>().remove("3"));
- SharedDataMessage d = new SharedDataMessage();
- d.setKey(orSetKey);
- d.setPayload(o2);
- d.setExpireAt(Long.MAX_VALUE);
- d.setTimestamp(System.currentTimeMillis());
- clients.get(0).merge(d);
- }
-
- private void assertThatOrSetIsMerged(final List<GossipManager> clients){
- TUnit.assertThat(() -> {
- return clients.get(0).findCrdt(orSetKey).value();
- }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet<String>("1", "2", "3", "4").value());
- TUnit.assertThat(() -> {
- return clients.get(1).findCrdt(orSetKey).value();
- }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet<String>("1", "2", "3", "4").value());
- }
-
- private void assertThatOrSetDelIsMerged(final List<GossipManager> clients){
- TUnit.assertThat(() -> {
- return clients.get(0).findCrdt(orSetKey);
- }).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new OrSet<String>("1", "2", "4"));
- }
-
- private void givenDifferentDatumsInSet(final List<GossipManager> clients){
- clients.get(0).merge(CrdtMessage("1"));
- clients.get(1).merge(CrdtMessage("2"));
- }
-
-
- private void assertThatCountIsUpdated(final List<GossipManager> clients, long finalCount) {
- TUnit.assertThat(() -> {
- return clients.get(0).findCrdt(gCounterKey);
- }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter(
- new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount)));
- }
-
- private void assertThatListIsMerged(final List<GossipManager> clients){
- TUnit.assertThat(() -> {
- return clients.get(0).findCrdt("cr");
- }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet<String>(Arrays.asList("1","2")));
- }
-
- private SharedDataMessage CrdtMessage(String item){
- SharedDataMessage d = new SharedDataMessage();
- d.setKey("cr");
- d.setPayload(new GrowOnlySet<String>( Arrays.asList(item)));
- d.setExpireAt(Long.MAX_VALUE);
- d.setTimestamp(System.currentTimeMillis());
- return d;
- }
-
- private PerNodeDataMessage msg(){
- PerNodeDataMessage g = new PerNodeDataMessage();
- g.setExpireAt(Long.MAX_VALUE);
- g.setKey("a");
- g.setPayload("b");
- g.setTimestamp(System.currentTimeMillis());
- return g;
- }
-
- private SharedDataMessage sharedMsg(){
- SharedDataMessage g = new SharedDataMessage();
- g.setExpireAt(Long.MAX_VALUE);
- g.setKey("a");
- g.setPayload("c");
- g.setTimestamp(System.currentTimeMillis());
- return g;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java
deleted file mode 100644
index 1b6a32a..0000000
--- a/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
-import org.junit.jupiter.api.Test;
-import org.junit.platform.runner.JUnitPlatform;
-import org.junit.runner.RunWith;
-
-import io.teknek.tunit.TUnit;
-
-@RunWith(JUnitPlatform.class)
-public class IdAndPropertyTest extends AbstractIntegrationBase {
-
- @Test
- public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException {
- GossipSettings settings = new GossipSettings();
- settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
- settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
- settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
- List<Member> startupMembers = new ArrayList<>();
- Map<String, String> x = new HashMap<>();
- x.put("a", "b");
- x.put("datacenter", "dc1");
- x.put("rack", "rack1");
- GossipManager gossipService1 = GossipManagerBuilder.newBuilder()
- .cluster("a")
- .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)))
- .id("0")
- .properties(x)
- .gossipMembers(startupMembers)
- .gossipSettings(settings).build();
- gossipService1.init();
- register(gossipService1);
-
- Map<String, String> y = new HashMap<>();
- y.put("a", "c");
- y.put("datacenter", "dc2");
- y.put("rack", "rack2");
- GossipManager gossipService2 = GossipManagerBuilder.newBuilder().cluster("a")
- .uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 10)))
- .id("1")
- .properties(y)
- .gossipMembers(Arrays.asList(new RemoteMember("a",
- new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0")))
- .gossipSettings(settings).build();
- gossipService2.init();
- register(gossipService2);
-
- TUnit.assertThat(() -> {
- String value = "";
- try {
- value = gossipService1.getLiveMembers().get(0).getProperties().get("a");
- } catch (RuntimeException e){ }
- return value;
- }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("c");
-
- TUnit.assertThat(() -> {
- String value = "";
- try {
- value = gossipService2.getLiveMembers().get(0).getProperties().get("a");
- } catch (RuntimeException e){ }
- return value;
- }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
deleted file mode 100644
index 30c52bc..0000000
--- a/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip;
-
-import io.teknek.tunit.TUnit;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
-import org.apache.log4j.Logger;
-
-import org.junit.platform.runner.JUnitPlatform;
-import org.junit.jupiter.api.Test;
-
-import org.junit.runner.RunWith;
-
-@RunWith(JUnitPlatform.class)
-public class ShutdownDeadtimeTest {
-
- private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class);
-
- // Note: this test is floppy depending on the values in GossipSettings (smaller values seem to do harm), and the
- // sleep that happens after startup.
- @Test
- public void DeadNodesDoNotComeAliveAgain()
- throws InterruptedException, UnknownHostException, URISyntaxException {
- GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal");
- settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
- settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
- settings.setPersistRingState(false);
- settings.setPersistDataState(false);
- String cluster = UUID.randomUUID().toString();
- int seedNodes = 3;
- List<Member> startupMembers = new ArrayList<>();
- for (int i = 1; i < seedNodes + 1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i));
- startupMembers.add(new RemoteMember(cluster, uri, i + ""));
- }
- final List<GossipManager> clients = Collections.synchronizedList(new ArrayList<GossipManager>());
- final int clusterMembers = 5;
- for (int i = 1; i < clusterMembers + 1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i));
- GossipManager gossipService = GossipManagerBuilder.newBuilder()
- .cluster(cluster)
- .uri(uri)
- .id(i + "")
- .gossipMembers(startupMembers)
- .gossipSettings(settings)
- .build();
- clients.add(gossipService);
- gossipService.init();
- Thread.sleep(1000);
- }
- TUnit.assertThat(new Callable<Integer>() {
- public Integer call() throws Exception {
- int total = 0;
- for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).getLiveMembers().size();
- }
- return total;
- }
- }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
-
- // shutdown one client and verify that one client is lost.
- Random r = new Random();
- int randomClientId = r.nextInt(clusterMembers);
- log.info("shutting down " + randomClientId);
- final int shutdownPort = clients.get(randomClientId).getMyself().getUri()
- .getPort();
- final String shutdownId = clients.get(randomClientId).getMyself().getId();
- clients.get(randomClientId).shutdown();
- TUnit.assertThat(new Callable<Integer>() {
- public Integer call() throws Exception {
- int total = 0;
- for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).getLiveMembers().size();
- }
- return total;
- }
- }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(16);
- clients.remove(randomClientId);
-
- TUnit.assertThat(new Callable<Integer>() {
- public Integer call() throws Exception {
- int total = 0;
- for (int i = 0; i < clusterMembers - 1; ++i) {
- total += clients.get(i).getDeadMembers().size();
- }
- return total;
- }
- }).afterWaitingAtMost(50, TimeUnit.SECONDS).isEqualTo(4);
-
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
- // start client again
- GossipManager gossipService = GossipManagerBuilder.newBuilder()
- .gossipSettings(settings)
- .cluster(cluster)
- .uri(uri)
- .id(shutdownId+"")
- .gossipMembers(startupMembers)
- .build();
- clients.add(gossipService);
- gossipService.init();
-
- // verify that the client is alive again for every node
- TUnit.assertThat(new Callable<Integer>() {
- public Integer call() throws Exception {
- int total = 0;
- for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).getLiveMembers().size();
- }
- return total;
- }
- }).afterWaitingAtMost(60, TimeUnit.SECONDS).isEqualTo(20);
-
- for (int i = 0; i < clusterMembers; ++i) {
- final int j = i;
- new Thread() {
- public void run(){
- clients.get(j).shutdown();
- }
- }.start();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java b/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java
deleted file mode 100644
index f669a23..0000000
--- a/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.NoSuchAlgorithmException;
-import java.security.NoSuchProviderException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
-import org.apache.gossip.manager.PassiveGossipConstants;
-import org.apache.gossip.secure.KeyTool;
-import org.junit.Assert;
-import org.junit.Test;
-
-import io.teknek.tunit.TUnit;
-
-public class SignedMessageTest extends AbstractIntegrationBase {
-
- private GossipSettings gossiperThatSigns(){
- GossipSettings settings = new GossipSettings();
- settings.setPersistRingState(false);
- settings.setPersistDataState(false);
- settings.setSignMessages(true);
- settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
- settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
- return settings;
- }
-
- private GossipSettings gossiperThatSigns(String keysDir){
- GossipSettings settings = gossiperThatSigns();
- settings.setPathToKeyStore(Objects.requireNonNull(keysDir));
- return settings;
- }
-
- @Test
- public void dataTest() throws InterruptedException, URISyntaxException, NoSuchAlgorithmException, NoSuchProviderException, IOException {
- final String keys = System.getProperty("java.io.tmpdir") + "/keys";
- GossipSettings settings = gossiperThatSigns(keys);
- setup(keys);
- String cluster = UUID.randomUUID().toString();
- List<Member> startupMembers = new ArrayList<>();
- for (int i = 1; i < 2; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i));
- startupMembers.add(new RemoteMember(cluster, uri, i + ""));
- }
- final List<GossipManager> clients = new ArrayList<>();
- for (int i = 1; i < 3; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i));
- GossipManager gossipService = GossipManagerBuilder.newBuilder()
- .cluster(cluster)
- .uri(uri)
- .id(i + "")
- .gossipMembers(startupMembers)
- .gossipSettings(settings)
- .build();
- gossipService.init();
- clients.add(gossipService);
- }
- assertTwoAlive(clients);
- assertOnlySignedMessages(clients);
- cleanup(keys, clients);
- }
-
- private void assertTwoAlive(List<GossipManager> clients){
- TUnit.assertThat(() -> {
- int total = 0;
- for (int i = 0; i < clients.size(); ++i) {
- total += clients.get(i).getLiveMembers().size();
- }
- return total;
- }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
- }
-
- private void assertOnlySignedMessages(List<GossipManager> clients){
- Assert.assertEquals(0, clients.get(0).getRegistry()
- .meter(PassiveGossipConstants.UNSIGNED_MESSAGE).getCount());
- Assert.assertTrue(clients.get(0).getRegistry()
- .meter(PassiveGossipConstants.SIGNED_MESSAGE).getCount() > 0);
- }
-
- private void cleanup(String keys, List<GossipManager> clients){
- new File(keys, "1").delete();
- new File(keys, "2").delete();
- new File(keys).delete();
- for (int i = 0; i < clients.size(); ++i) {
- clients.get(i).shutdown();
- }
- }
-
- private void setup(String keys) throws NoSuchAlgorithmException, NoSuchProviderException, IOException {
- new File(keys).mkdir();
- KeyTool.generatePubandPrivateKeyFiles(keys, "1");
- KeyTool.generatePubandPrivateKeyFiles(keys, "2");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java b/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java
deleted file mode 100644
index ea93a90..0000000
--- a/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip;
-
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
-import org.apache.log4j.Logger;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.UUID;
-import org.junit.platform.runner.JUnitPlatform;
-import org.junit.runner.RunWith;
-
-/**
- * Tests support of using {@code StartupSettings} and thereby reading
- * setup config from file.
- */
-@RunWith(JUnitPlatform.class)
-public class StartupSettingsTest {
- private static final Logger log = Logger.getLogger(StartupSettingsTest.class);
- private static final String CLUSTER = UUID.randomUUID().toString();
-
- @Test
- public void testUsingSettingsFile() throws IOException, InterruptedException, URISyntaxException {
- File settingsFile = File.createTempFile("gossipTest",".json");
- settingsFile.deleteOnExit();
- writeSettingsFile(settingsFile);
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000);
- GossipSettings firstGossipSettings = new GossipSettings();
- firstGossipSettings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
- firstGossipSettings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
- GossipManager firstService = GossipManagerBuilder.newBuilder()
- .cluster(CLUSTER)
- .uri(uri)
- .id("1")
- .gossipSettings(firstGossipSettings).build();
- firstService.init();
- GossipManager manager = GossipManagerBuilder.newBuilder()
- .startupSettings(StartupSettings.fromJSONFile(settingsFile)).build();
- manager.init();
- firstService.shutdown();
- manager.shutdown();
- }
-
- private void writeSettingsFile( File target ) throws IOException {
- String settings =
- "[{\n" + // It is odd that this is meant to be in an array, but oh well.
- " \"cluster\":\"" + CLUSTER + "\",\n" +
- " \"id\":\"" + "2" + "\",\n" +
- " \"uri\":\"udp://127.0.0.1:50001\",\n" +
- " \"gossip_interval\":1000,\n" +
- " \"window_size\":1000,\n" +
- " \"minimum_samples\":5,\n" +
- " \"cleanup_interval\":10000,\n" +
- " \"convict_threshold\":2.6,\n" +
- " \"distribution\":\"exponential\",\n" +
- " \"transport_manager_class\":\"org.apache.gossip.transport.UnitTestTransportManager\",\n" +
- " \"protocol_manager_class\":\"org.apache.gossip.protocol.UnitTestProtocolManager\",\n" +
- " \"properties\":{},\n" +
- " \"members\":[\n" +
- " {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" +
- " ]\n" +
- "}]";
-
- log.info( "Using settings file with contents of:\n---\n" + settings + "\n---" );
- FileOutputStream output = new FileOutputStream(target);
- output.write(settings.getBytes());
- output.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
deleted file mode 100644
index c6d7d46..0000000
--- a/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip;
-
-import io.teknek.tunit.TUnit;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import org.junit.platform.runner.JUnitPlatform;
-import org.junit.runner.RunWith;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
-import org.junit.jupiter.api.Test;
-
-@RunWith(JUnitPlatform.class)
-public class TenNodeThreeSeedTest {
-
- @Test
- public void test() throws UnknownHostException, InterruptedException, URISyntaxException {
- abc(30150);
- }
-
- @Test
- public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException {
- abc(30100);
- }
-
- public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException {
- GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential");
- settings.setPersistRingState(false);
- settings.setPersistDataState(false);
- settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
- settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
- String cluster = UUID.randomUUID().toString();
- int seedNodes = 3;
- List<Member> startupMembers = new ArrayList<>();
- for (int i = 1; i < seedNodes+1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
- startupMembers.add(new RemoteMember(cluster, uri, i + ""));
- }
- final List<GossipManager> clients = new ArrayList<>();
- final int clusterMembers = 5;
- for (int i = 1; i < clusterMembers+1; ++i) {
- URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
- GossipManager gossipService = GossipManagerBuilder.newBuilder()
- .cluster(cluster)
- .uri(uri)
- .id(i + "")
- .gossipSettings(settings)
- .gossipMembers(startupMembers)
- .build();
- gossipService.init();
- clients.add(gossipService);
- }
- TUnit.assertThat(new Callable<Integer> (){
- public Integer call() throws Exception {
- int total = 0;
- for (int i = 0; i < clusterMembers; ++i) {
- total += clients.get(i).getLiveMembers().size();
- }
- return total;
- }}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
-
- for (int i = 0; i < clusterMembers; ++i) {
- int j = i;
- new Thread(){
- public void run(){
- clients.get(j).shutdown();
- }
- }.start();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
index ec91d67..42b9353 100644
--- a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
@@ -102,7 +102,7 @@ public class MessageHandlerTest {
@Test(expected = NullPointerException.class)
public void cantAddNullHandler2() {
- MessageHandler handler = MessageHandlerFactory.concurrentHandler(
+ MessageHandlerFactory.concurrentHandler(
new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()),
null,
new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler())
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java b/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java
index a783b75..206bc62 100644
--- a/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java
+++ b/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java
@@ -29,7 +29,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
/** Only use in unit tests! */
-public class UnitTestTransportManager extends AbstractTransportManager {
+public class UnitTestTransportManager extends AbstractTransportManager {
private static final Map<URI, UnitTestTransportManager> allManagers = new ConcurrentHashMap<>();
@@ -71,6 +71,5 @@ public class UnitTestTransportManager extends AbstractTransportManager {
@Override
public void startEndpoint() {
allManagers.put(localEndpoint, this);
- super.startEndpoint();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/pom.xml
----------------------------------------------------------------------
diff --git a/gossip-itest/pom.xml b/gossip-itest/pom.xml
new file mode 100644
index 0000000..6067732
--- /dev/null
+++ b/gossip-itest/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.gossip</groupId>
+ <artifactId>gossip-parent</artifactId>
+ <version>0.1.3-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <name>Gossip itest</name>
+ <artifactId>gossip-itest</artifactId>
+ <version>0.1.3-incubating-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gossip</groupId>
+ <artifactId>gossip-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gossip</groupId>
+ <artifactId>gossip-base</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gossip</groupId>
+ <artifactId>gossip-protocol-jackson</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gossip</groupId>
+ <artifactId>gossip-transport-udp</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <systemPropertyVariables>
+ <java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
+ </systemPropertyVariables>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-surefire-provider</artifactId>
+ <version>${junit.platform.version}</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
----------------------------------------------------------------------
diff --git a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
new file mode 100644
index 0000000..9fe9aa9
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.crdt.GrowOnlyCounter;
+import org.apache.gossip.crdt.GrowOnlySet;
+import org.apache.gossip.crdt.OrSet;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.SharedDataMessage;
+import org.junit.Test;
+
+import io.teknek.tunit.TUnit;
+
+public class DataTest extends AbstractIntegrationBase {
+
+ private String orSetKey = "cror";
+ private String gCounterKey = "crdtgc";
+
+ @Test
+ public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
+ GossipSettings settings = new GossipSettings();
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
+ String cluster = UUID.randomUUID().toString();
+ int seedNodes = 1;
+ List<Member> startupMembers = new ArrayList<>();
+ for (int i = 1; i < seedNodes+1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+ }
+ final List<GossipManager> clients = new ArrayList<>();
+ final int clusterMembers = 2;
+ for (int i = 1; i < clusterMembers + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
+ .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
+ clients.add(gossipService);
+ gossipService.init();
+ register(gossipService);
+ }
+ TUnit.assertThat(() -> {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).getLiveMembers().size();
+ }
+ return total;
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+ clients.get(0).gossipPerNodeData(msg());
+ clients.get(0).gossipSharedData(sharedMsg());
+
+ TUnit.assertThat(()-> {
+ PerNodeDataMessage x = clients.get(1).findPerNodeGossipData(1 + "", "a");
+ if (x == null)
+ return "";
+ else
+ return x.getPayload();
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
+
+ TUnit.assertThat(() -> {
+ SharedDataMessage x = clients.get(1).findSharedGossipData("a");
+ if (x == null)
+ return "";
+ else
+ return x.getPayload();
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
+
+
+ givenDifferentDatumsInSet(clients);
+ assertThatListIsMerged(clients);
+
+ givenOrs(clients);
+ assertThatOrSetIsMerged(clients);
+ dropIt(clients);
+ assertThatOrSetDelIsMerged(clients);
+
+
+ // test g counter
+ givenDifferentIncrement(clients);
+ assertThatCountIsUpdated(clients, 3);
+ givenIncreaseOther(clients);
+ assertThatCountIsUpdated(clients, 7);
+
+ for (int i = 0; i < clusterMembers; ++i) {
+ clients.get(i).shutdown();
+ }
+ }
+
+ private void givenDifferentIncrement(final List<GossipManager> clients) {
+ {
+ SharedDataMessage d = new SharedDataMessage();
+ d.setKey(gCounterKey);
+ d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L)));
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(0).merge(d);
+ }
+ {
+ SharedDataMessage d = new SharedDataMessage();
+ d.setKey(gCounterKey);
+ d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L)));
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(1).merge(d);
+ }
+ }
+
+ private void givenIncreaseOther(final List<GossipManager> clients) {
+ GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey);
+ GrowOnlyCounter gc2 = new GrowOnlyCounter(gc,
+ new GrowOnlyCounter.Builder(clients.get(1)).increment(4L));
+
+ SharedDataMessage d = new SharedDataMessage();
+ d.setKey(gCounterKey);
+ d.setPayload(gc2);
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(1).merge(d);
+ }
+
+ private void givenOrs(List<GossipManager> clients) {
+ {
+ SharedDataMessage d = new SharedDataMessage();
+ d.setKey(orSetKey);
+ d.setPayload(new OrSet<String>("1", "2"));
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(0).merge(d);
+ }
+ {
+ SharedDataMessage d = new SharedDataMessage();
+ d.setKey(orSetKey);
+ d.setPayload(new OrSet<String>("3", "4"));
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(1).merge(d);
+ }
+ }
+
+ private void dropIt(List<GossipManager> clients) {
+ @SuppressWarnings("unchecked")
+ OrSet<String> o = (OrSet<String>) clients.get(0).findCrdt(orSetKey);
+ OrSet<String> o2 = new OrSet<String>(o, new OrSet.Builder<String>().remove("3"));
+ SharedDataMessage d = new SharedDataMessage();
+ d.setKey(orSetKey);
+ d.setPayload(o2);
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(0).merge(d);
+ }
+
+ private void assertThatOrSetIsMerged(final List<GossipManager> clients){
+ TUnit.assertThat(() -> {
+ return clients.get(0).findCrdt(orSetKey).value();
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet<String>("1", "2", "3", "4").value());
+ TUnit.assertThat(() -> {
+ return clients.get(1).findCrdt(orSetKey).value();
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet<String>("1", "2", "3", "4").value());
+ }
+
+ private void assertThatOrSetDelIsMerged(final List<GossipManager> clients){
+ TUnit.assertThat(() -> {
+ return clients.get(0).findCrdt(orSetKey);
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new OrSet<String>("1", "2", "4"));
+ }
+
+ private void givenDifferentDatumsInSet(final List<GossipManager> clients){
+ clients.get(0).merge(CrdtMessage("1"));
+ clients.get(1).merge(CrdtMessage("2"));
+ }
+
+
+ private void assertThatCountIsUpdated(final List<GossipManager> clients, long finalCount) {
+ TUnit.assertThat(() -> {
+ return clients.get(0).findCrdt(gCounterKey);
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter(
+ new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount)));
+ }
+
+ private void assertThatListIsMerged(final List<GossipManager> clients){
+ TUnit.assertThat(() -> {
+ return clients.get(0).findCrdt("cr");
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet<String>(Arrays.asList("1","2")));
+ }
+
+ private SharedDataMessage CrdtMessage(String item){
+ SharedDataMessage d = new SharedDataMessage();
+ d.setKey("cr");
+ d.setPayload(new GrowOnlySet<String>( Arrays.asList(item)));
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ return d;
+ }
+
+ private PerNodeDataMessage msg(){
+ PerNodeDataMessage g = new PerNodeDataMessage();
+ g.setExpireAt(Long.MAX_VALUE);
+ g.setKey("a");
+ g.setPayload("b");
+ g.setTimestamp(System.currentTimeMillis());
+ return g;
+ }
+
+ private SharedDataMessage sharedMsg(){
+ SharedDataMessage g = new SharedDataMessage();
+ g.setExpireAt(Long.MAX_VALUE);
+ g.setKey("a");
+ g.setPayload("c");
+ g.setTimestamp(System.currentTimeMillis());
+ return g;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/src/test/java/org/apache/gossip/IdAndPropertyTest.java
----------------------------------------------------------------------
diff --git a/gossip-itest/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/gossip-itest/src/test/java/org/apache/gossip/IdAndPropertyTest.java
new file mode 100644
index 0000000..7f550de
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/IdAndPropertyTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.junit.jupiter.api.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import io.teknek.tunit.TUnit;
+
+@RunWith(JUnitPlatform.class)
+public class IdAndPropertyTest extends AbstractIntegrationBase {
+
+ @Test
+ public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException {
+ GossipSettings settings = new GossipSettings();
+ settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
+ List<Member> startupMembers = new ArrayList<>();
+ Map<String, String> x = new HashMap<>();
+ x.put("a", "b");
+ x.put("datacenter", "dc1");
+ x.put("rack", "rack1");
+ GossipManager gossipService1 = GossipManagerBuilder.newBuilder()
+ .cluster("a")
+ .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)))
+ .id("0")
+ .properties(x)
+ .gossipMembers(startupMembers)
+ .gossipSettings(settings).build();
+ gossipService1.init();
+ register(gossipService1);
+
+ Map<String, String> y = new HashMap<>();
+ y.put("a", "c");
+ y.put("datacenter", "dc2");
+ y.put("rack", "rack2");
+ GossipManager gossipService2 = GossipManagerBuilder.newBuilder().cluster("a")
+ .uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 10)))
+ .id("1")
+ .properties(y)
+ .gossipMembers(Arrays.asList(new RemoteMember("a",
+ new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0")))
+ .gossipSettings(settings).build();
+ gossipService2.init();
+ register(gossipService2);
+
+ TUnit.assertThat(() -> {
+ String value = "";
+ try {
+ value = gossipService1.getLiveMembers().get(0).getProperties().get("a");
+ } catch (RuntimeException e){ }
+ return value;
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("c");
+
+ TUnit.assertThat(() -> {
+ String value = "";
+ try {
+ value = gossipService2.getLiveMembers().get(0).getProperties().get("a");
+ } catch (RuntimeException e){ }
+ return value;
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
----------------------------------------------------------------------
diff --git a/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
new file mode 100644
index 0000000..dd5bfe9
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import io.teknek.tunit.TUnit;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.log4j.Logger;
+
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.jupiter.api.Test;
+
+import org.junit.runner.RunWith;
+
+@RunWith(JUnitPlatform.class)
+public class ShutdownDeadtimeTest {
+
+ private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class);
+
+ // Note: this test is floppy depending on the values in GossipSettings (smaller values seem to do harm), and the
+ // sleep that happens after startup.
+ @Test
+ public void DeadNodesDoNotComeAliveAgain()
+ throws InterruptedException, UnknownHostException, URISyntaxException {
+ GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal");
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
+ String cluster = UUID.randomUUID().toString();
+ int seedNodes = 3;
+ List<Member> startupMembers = new ArrayList<>();
+ for (int i = 1; i < seedNodes + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i));
+ startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+ }
+ final List<GossipManager> clients = Collections.synchronizedList(new ArrayList<GossipManager>());
+ final int clusterMembers = 5;
+ for (int i = 1; i < clusterMembers + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i));
+ GossipManager gossipService = GossipManagerBuilder.newBuilder()
+ .cluster(cluster)
+ .uri(uri)
+ .id(i + "")
+ .gossipMembers(startupMembers)
+ .gossipSettings(settings)
+ .build();
+ clients.add(gossipService);
+ gossipService.init();
+ Thread.sleep(1000);
+ }
+ TUnit.assertThat(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).getLiveMembers().size();
+ }
+ return total;
+ }
+ }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
+
+ // shutdown one client and verify that one client is lost.
+ Random r = new Random();
+ int randomClientId = r.nextInt(clusterMembers);
+ log.info("shutting down " + randomClientId);
+ final int shutdownPort = clients.get(randomClientId).getMyself().getUri()
+ .getPort();
+ final String shutdownId = clients.get(randomClientId).getMyself().getId();
+ clients.get(randomClientId).shutdown();
+ TUnit.assertThat(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).getLiveMembers().size();
+ }
+ return total;
+ }
+ }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(16);
+ clients.remove(randomClientId);
+
+ TUnit.assertThat(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers - 1; ++i) {
+ total += clients.get(i).getDeadMembers().size();
+ }
+ return total;
+ }
+ }).afterWaitingAtMost(50, TimeUnit.SECONDS).isEqualTo(4);
+
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
+ // start client again
+ GossipManager gossipService = GossipManagerBuilder.newBuilder()
+ .gossipSettings(settings)
+ .cluster(cluster)
+ .uri(uri)
+ .id(shutdownId+"")
+ .gossipMembers(startupMembers)
+ .build();
+ clients.add(gossipService);
+ gossipService.init();
+
+ // verify that the client is alive again for every node
+ TUnit.assertThat(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).getLiveMembers().size();
+ }
+ return total;
+ }
+ }).afterWaitingAtMost(60, TimeUnit.SECONDS).isEqualTo(20);
+
+ for (int i = 0; i < clusterMembers; ++i) {
+ final int j = i;
+ new Thread() {
+ public void run(){
+ clients.get(j).shutdown();
+ }
+ }.start();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/src/test/java/org/apache/gossip/SignedMessageTest.java
----------------------------------------------------------------------
diff --git a/gossip-itest/src/test/java/org/apache/gossip/SignedMessageTest.java b/gossip-itest/src/test/java/org/apache/gossip/SignedMessageTest.java
new file mode 100644
index 0000000..e288cb8
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/SignedMessageTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.manager.PassiveGossipConstants;
+import org.apache.gossip.secure.KeyTool;
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.teknek.tunit.TUnit;
+
+public class SignedMessageTest extends AbstractIntegrationBase {
+
+ private GossipSettings gossiperThatSigns(){
+ GossipSettings settings = new GossipSettings();
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
+ settings.setSignMessages(true);
+ return settings;
+ }
+
+ private GossipSettings gossiperThatSigns(String keysDir){
+ GossipSettings settings = gossiperThatSigns();
+ settings.setPathToKeyStore(Objects.requireNonNull(keysDir));
+ return settings;
+ }
+
+ @Test
+ public void dataTest() throws InterruptedException, URISyntaxException, NoSuchAlgorithmException, NoSuchProviderException, IOException {
+ final String keys = System.getProperty("java.io.tmpdir") + "/keys";
+ GossipSettings settings = gossiperThatSigns(keys);
+ setup(keys);
+ String cluster = UUID.randomUUID().toString();
+ List<Member> startupMembers = new ArrayList<>();
+ for (int i = 1; i < 2; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i));
+ startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+ }
+ final List<GossipManager> clients = new ArrayList<>();
+ for (int i = 1; i < 3; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i));
+ GossipManager gossipService = GossipManagerBuilder.newBuilder()
+ .cluster(cluster)
+ .uri(uri)
+ .id(i + "")
+ .gossipMembers(startupMembers)
+ .gossipSettings(settings)
+ .build();
+ gossipService.init();
+ clients.add(gossipService);
+ }
+ assertTwoAlive(clients);
+ assertOnlySignedMessages(clients);
+ cleanup(keys, clients);
+ }
+
+ private void assertTwoAlive(List<GossipManager> clients){
+ TUnit.assertThat(() -> {
+ int total = 0;
+ for (int i = 0; i < clients.size(); ++i) {
+ total += clients.get(i).getLiveMembers().size();
+ }
+ return total;
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+ }
+
+ private void assertOnlySignedMessages(List<GossipManager> clients){
+ Assert.assertEquals(0, clients.get(0).getRegistry()
+ .meter(PassiveGossipConstants.UNSIGNED_MESSAGE).getCount());
+ Assert.assertTrue(clients.get(0).getRegistry()
+ .meter(PassiveGossipConstants.SIGNED_MESSAGE).getCount() > 0);
+ }
+
+ private void cleanup(String keys, List<GossipManager> clients){
+ new File(keys, "1").delete();
+ new File(keys, "2").delete();
+ new File(keys).delete();
+ for (int i = 0; i < clients.size(); ++i) {
+ clients.get(i).shutdown();
+ }
+ }
+
+ private void setup(String keys) throws NoSuchAlgorithmException, NoSuchProviderException, IOException {
+ new File(keys).mkdir();
+ KeyTool.generatePubandPrivateKeyFiles(keys, "1");
+ KeyTool.generatePubandPrivateKeyFiles(keys, "2");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/src/test/java/org/apache/gossip/StartupSettingsTest.java
----------------------------------------------------------------------
diff --git a/gossip-itest/src/test/java/org/apache/gossip/StartupSettingsTest.java b/gossip-itest/src/test/java/org/apache/gossip/StartupSettingsTest.java
new file mode 100644
index 0000000..ea93a90
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/StartupSettingsTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.log4j.Logger;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.UUID;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+/**
+ * Tests support of using {@code StartupSettings} and thereby reading
+ * setup config from file.
+ */
+@RunWith(JUnitPlatform.class)
+public class StartupSettingsTest {
+ private static final Logger log = Logger.getLogger(StartupSettingsTest.class);
+ private static final String CLUSTER = UUID.randomUUID().toString();
+
+ @Test
+ public void testUsingSettingsFile() throws IOException, InterruptedException, URISyntaxException {
+ File settingsFile = File.createTempFile("gossipTest",".json");
+ settingsFile.deleteOnExit();
+ writeSettingsFile(settingsFile);
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000);
+ GossipSettings firstGossipSettings = new GossipSettings();
+ firstGossipSettings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
+ firstGossipSettings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
+ GossipManager firstService = GossipManagerBuilder.newBuilder()
+ .cluster(CLUSTER)
+ .uri(uri)
+ .id("1")
+ .gossipSettings(firstGossipSettings).build();
+ firstService.init();
+ GossipManager manager = GossipManagerBuilder.newBuilder()
+ .startupSettings(StartupSettings.fromJSONFile(settingsFile)).build();
+ manager.init();
+ firstService.shutdown();
+ manager.shutdown();
+ }
+
+ private void writeSettingsFile( File target ) throws IOException {
+ String settings =
+ "[{\n" + // It is odd that this is meant to be in an array, but oh well.
+ " \"cluster\":\"" + CLUSTER + "\",\n" +
+ " \"id\":\"" + "2" + "\",\n" +
+ " \"uri\":\"udp://127.0.0.1:50001\",\n" +
+ " \"gossip_interval\":1000,\n" +
+ " \"window_size\":1000,\n" +
+ " \"minimum_samples\":5,\n" +
+ " \"cleanup_interval\":10000,\n" +
+ " \"convict_threshold\":2.6,\n" +
+ " \"distribution\":\"exponential\",\n" +
+ " \"transport_manager_class\":\"org.apache.gossip.transport.UnitTestTransportManager\",\n" +
+ " \"protocol_manager_class\":\"org.apache.gossip.protocol.UnitTestProtocolManager\",\n" +
+ " \"properties\":{},\n" +
+ " \"members\":[\n" +
+ " {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" +
+ " ]\n" +
+ "}]";
+
+ log.info( "Using settings file with contents of:\n---\n" + settings + "\n---" );
+ FileOutputStream output = new FileOutputStream(target);
+ output.write(settings.getBytes());
+ output.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
----------------------------------------------------------------------
diff --git a/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
new file mode 100644
index 0000000..8ae783e
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import io.teknek.tunit.TUnit;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.junit.jupiter.api.Test;
+
+@RunWith(JUnitPlatform.class)
+public class TenNodeThreeSeedTest {
+
+ @Test
+ public void test() throws UnknownHostException, InterruptedException, URISyntaxException {
+ abc(30150);
+ }
+
+ @Test
+ public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException {
+ abc(30100);
+ }
+
+ public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException {
+ GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential");
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
+ String cluster = UUID.randomUUID().toString();
+ int seedNodes = 3;
+ List<Member> startupMembers = new ArrayList<>();
+ for (int i = 1; i < seedNodes+1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
+ startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+ }
+ final List<GossipManager> clients = new ArrayList<>();
+ final int clusterMembers = 5;
+ for (int i = 1; i < clusterMembers+1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
+ GossipManager gossipService = GossipManagerBuilder.newBuilder()
+ .cluster(cluster)
+ .uri(uri)
+ .id(i + "")
+ .gossipSettings(settings)
+ .gossipMembers(startupMembers)
+ .build();
+ gossipService.init();
+ clients.add(gossipService);
+ }
+ TUnit.assertThat(new Callable<Integer> (){
+ public Integer call() throws Exception {
+ int total = 0;
+ for (int i = 0; i < clusterMembers; ++i) {
+ total += clients.get(i).getLiveMembers().size();
+ }
+ return total;
+ }}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
+
+ for (int i = 0; i < clusterMembers; ++i) {
+ int j = i;
+ new Thread(){
+ public void run(){
+ clients.get(j).shutdown();
+ }
+ }.start();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-protocol-jackson/pom.xml
----------------------------------------------------------------------
diff --git a/gossip-protocol-jackson/pom.xml b/gossip-protocol-jackson/pom.xml
index 067a27e..128a26d 100644
--- a/gossip-protocol-jackson/pom.xml
+++ b/gossip-protocol-jackson/pom.xml
@@ -36,16 +36,16 @@
<dependency>
<groupId>org.apache.gossip</groupId>
<artifactId>gossip-base</artifactId>
- <version>0.1.3-incubating-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.gossip</groupId>
<artifactId>gossip-base</artifactId>
- <version>0.1.3-incubating-SNAPSHOT</version>
+ <version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
----------------------------------------------------------------------
diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
index bd8a949..cbac460 100644
--- a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
+++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java
@@ -25,9 +25,7 @@ import org.apache.gossip.Member;
import org.apache.gossip.crdt.OrSet;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.GossipManagerBuilder;
-import org.apache.gossip.model.Base;
import org.apache.gossip.protocol.ProtocolManager;
-import org.apache.gossip.udp.Trackable;
import org.junit.Assert;
import org.junit.Test;
@@ -36,11 +34,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.UUID;
public class JacksonTest {
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java
----------------------------------------------------------------------
diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java
index 43032de..7ac211d 100644
--- a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java
+++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java
@@ -41,6 +41,7 @@ class TestMessage extends Base implements Trackable {
private Object[] arrayOfThings;
private Map<String, String> mapOfThings = new HashMap<>();
+ @SuppressWarnings("unused")//Used by ObjectMapper
private TestMessage() {
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-transport-udp/pom.xml
----------------------------------------------------------------------
diff --git a/gossip-transport-udp/pom.xml b/gossip-transport-udp/pom.xml
index 2e79b1a..446aace 100644
--- a/gossip-transport-udp/pom.xml
+++ b/gossip-transport-udp/pom.xml
@@ -36,8 +36,8 @@
<dependency>
<groupId>org.apache.gossip</groupId>
<artifactId>gossip-base</artifactId>
- <version>0.1.3-incubating-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>