You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 17:29:35 UTC
git commit: S4-36 : CommTests should use ZkBasedClusterManagement
Updated Branches:
refs/heads/piper 111959da9 -> 25c379f2c
S4-36 : CommTests should use ZkBasedClusterManagement
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/25c379f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/25c379f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/25c379f2
Branch: refs/heads/piper
Commit: 25c379f2c29c509d6892146636cb124ad4dd53b7
Parents: 111959d
Author: Karthik Kambatla <kk...@cs.purdue.edu>
Authored: Mon Dec 26 19:24:13 2011 -0500
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Jan 3 18:19:53 2012 +0100
----------------------------------------------------------------------
.../java/org/apache/s4/comm/DeliveryTestUtil.java | 143 ++++++++++++
.../org/apache/s4/comm/SimpleDeliveryTest.java | 174 ---------------
.../test/java/org/apache/s4/comm/TCPCommTest.java | 84 -------
.../test/java/org/apache/s4/comm/UDPCommTest.java | 84 -------
.../java/org/apache/s4/comm/tcp/TCPCommTest.java | 69 ++++++
.../s4/comm/topology/AssignmentFromZKTest.java | 1 +
.../s4/comm/topology/TopologyFromZKTest.java | 1 +
.../java/org/apache/s4/comm/udp/UDPCommTest.java | 65 ++++++
.../FileBasedClusterManagementTestModule.java | 1 -
.../ZkBasedClusterManagementTestModule.java | 44 +++--
.../java/org/apache/s4/fixtures/ZkBasedTest.java | 48 ++++
.../src/test/resources/default.s4.properties | 9 +
.../org/apache/s4/fixtures/ZkBasedAppModule.java | 35 +++
.../apache/s4/wordcount/zk/WordCountModuleZk.java | 6 +-
.../apache/s4/wordcount/zk/WordCountTestZk.java | 68 +------
15 files changed, 402 insertions(+), 430 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
new file mode 100644
index 0000000..19b4de1
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
@@ -0,0 +1,143 @@
+package org.apache.s4.comm;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/*
+ * Test util for communication protocols.
+ *
+ * <ul>
+ * <li> The util defines Send and Receive Threads </li>
+ * <li> SendThread sends out a pre-defined number of messages to all the partitions </li>
+ * <li> ReceiveThread receives all/most of these messages </li>
+ * <li> To avoid the receiveThread waiting for ever, it spawns a TimerThread that would
+ * interrupt after a pre-defined but long enough interval </li>
+ * </ul>
+ *
+ */
+public class DeliveryTestUtil {
+
+ private final Emitter emitter;
+ private final Listener listener;
+ private final int interval;
+ private int numMessages;
+ private int sleepCount;
+
+ // public Thread sendThread, receiveThread;
+ private final int messagesExpected;
+
+ @Inject
+ public DeliveryTestUtil(Emitter emitter, Listener listener, @Named("emitter.send.interval") int interval,
+ @Named("emitter.send.numMessages") int numMessages, @Named("listener.recv.sleepCount") int sleepCount) {
+ this.emitter = emitter;
+ this.listener = listener;
+ this.interval = interval;
+ this.numMessages = numMessages;
+ this.sleepCount = sleepCount;
+ this.messagesExpected = numMessages * this.emitter.getPartitionCount();
+
+ // this.sendThread = new SendThread();
+ // this.receiveThread = new ReceiveThread();
+ }
+
+ public class SendThread extends Thread {
+ @Override
+ public void run() {
+ try {
+ for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
+ for (int i = 0; i < numMessages; i++) {
+ byte[] message = (new String("message-" + i)).getBytes();
+ emitter.send(partition, message);
+ Thread.sleep(interval);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+ }
+
+ /*
+ * TimerThread - interrupts the passed thread, after specified time-interval.
+ */
+ class TimerThread extends Thread {
+ private final Thread watchThread;
+ private Integer sleepCounter;
+
+ TimerThread(Thread watchThread) {
+ this.watchThread = watchThread;
+ this.sleepCounter = new Integer(sleepCount);
+ }
+
+ public void resetSleepCounter() {
+ synchronized (this.sleepCounter) {
+ this.sleepCounter = sleepCount;
+ }
+ }
+
+ public void clearSleepCounter() {
+ synchronized (this.sleepCounter) {
+ this.sleepCounter = 0;
+ }
+ }
+
+ private int getCounter() {
+ synchronized (this.sleepCounter) {
+ return this.sleepCounter--;
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (getCounter() > 0) {
+ Thread.sleep(interval);
+ }
+ watchThread.interrupt();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ class ReceiveThread extends Thread {
+ private int messagesReceived = 0;
+
+ @Override
+ public void run() {
+
+ // start the timer thread to interrupt if blocked for too long
+ TimerThread timer = new TimerThread(this);
+ timer.start();
+ while (messagesReceived < messagesExpected) {
+ byte[] message = listener.recv();
+ timer.resetSleepCounter();
+ if (message != null)
+ messagesReceived++;
+ else
+ break;
+ }
+ timer.clearSleepCounter();
+ }
+
+ private boolean moreMessages() {
+ return ((messagesExpected - messagesReceived) > 0);
+ }
+ }
+
+ public Thread newSendThread() {
+ return new SendThread();
+ }
+
+ public Thread newReceiveThread() {
+ return new ReceiveThread();
+ }
+
+ public boolean moreMessages(Thread recvThread) {
+ return ((ReceiveThread) recvThread).moreMessages();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
deleted file mode 100644
index 5423361..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-package org.apache.s4.comm;
-
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Listener;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/*
- * Test class to test communication protocols. As comm-layer connections need to be
- * made including acquiring locks, the test is declared abstract and needs to be
- * extended with appropriate protocols.
- *
- * At a high-level, the test accomplishes the following:
- * <ul>
- * <li> Create Send and Receive Threads </li>
- * <li> SendThread sends out a pre-defined number of messages to all the partitions </li>
- * <li> ReceiveThread receives all/most of these messages </li>
- * <li> To avoid the receiveThread waiting for ever, it spawns a TimerThread that would
- * interrupt after a pre-defined but long enough interval </li>
- * <li> The receive thread reports on number of messages received and dropped </li>
- * </ul>
- *
- */
-public abstract class SimpleDeliveryTest {
- protected CommWrapper sdt;
- protected String lockdir;
-
- static class CommWrapper {
- private static final int MESSAGE_COUNT = 200;
- private static final int TIMER_THREAD_COUNT = 100;
-
- private final Emitter emitter;
- private final Listener listener;
- private final int interval;
-
- public Thread sendThread, receiveThread;
- private final int messagesExpected;
- private int messagesReceived = 0;
-
- @Inject
- public CommWrapper(@Named("emitter.send.interval") int interval, Emitter emitter, Listener listener) {
- this.emitter = emitter;
- this.listener = listener;
- this.interval = interval;
- this.messagesExpected = MESSAGE_COUNT * this.emitter.getPartitionCount();
-
- this.sendThread = new SendThread();
- this.receiveThread = new ReceiveThread();
- }
-
- public boolean moreMessages() {
- return ((messagesExpected - messagesReceived) > 0);
- }
-
- class SendThread extends Thread {
- @Override
- public void run() {
- try {
- for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- byte[] message = (new String("message-" + i)).getBytes();
- emitter.send(partition, message);
- Thread.sleep(interval);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- return;
- }
- }
- }
-
- /*
- * TimerThread - interrupts the passed thread, after specified time-interval.
- */
- class TimerThread extends Thread {
- private final Thread watchThread;
- private Integer sleepCounter;
-
- TimerThread(Thread watchThread) {
- this.watchThread = watchThread;
- this.sleepCounter = new Integer(TIMER_THREAD_COUNT);
- }
-
- public void resetSleepCounter() {
- synchronized (this.sleepCounter) {
- this.sleepCounter = TIMER_THREAD_COUNT;
- }
- }
-
- public void clearSleepCounter() {
- synchronized (this.sleepCounter) {
- this.sleepCounter = 0;
- }
- }
-
- private int getCounter() {
- synchronized (this.sleepCounter) {
- return this.sleepCounter--;
- }
- }
-
- @Override
- public void run() {
- try {
- while (getCounter() > 0) {
- Thread.sleep(interval);
- }
- watchThread.interrupt();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- class ReceiveThread extends Thread {
- @Override
- public void run() {
-
- // start the timer thread to interrupt if blocked for too long
- TimerThread timer = new TimerThread(this);
- timer.start();
- while (messagesReceived < messagesExpected) {
- byte[] message = listener.recv();
- timer.resetSleepCounter();
- if (message != null)
- messagesReceived++;
- else
- break;
- }
- timer.clearSleepCounter();
- }
- }
- }
-
- /*
- * All tests extending this class need to implement this method
- */
- @Before
- public abstract void setup();
-
- /**
- * Tests the protocol. If all components function without throwing exceptions, the test passes. The test also
- * reports the loss of messages, if any.
- *
- * @throws InterruptedException
- */
- @Test
- public void testCommLayerProtocol() throws InterruptedException {
- try {
- // start send and receive threads
- sdt.sendThread.start();
- sdt.receiveThread.start();
-
- // wait for them to finish
- sdt.sendThread.join();
- sdt.receiveThread.join();
-
- Assert.assertTrue("Guaranteed message delivery", !sdt.moreMessages());
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The comm protocol has failed basic functionality test");
- }
-
- Assert.assertTrue("The comm protocol seems to be working crash-free", true);
-
- System.out.println("Done");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
deleted file mode 100644
index b2ee5e1..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.s4.comm;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.name.Names;
-
-public class TCPCommTest extends SimpleDeliveryTest {
-
- @Override
- public void setup() {
- Injector injector = Guice.createInjector(new NettyTestModule());
- sdt = injector.getInstance(CommWrapper.class);
- }
-
- class NettyTestModule extends AbstractModule {
-
- protected PropertiesConfiguration config = null;
-
- private void loadProperties(Binder binder) {
-
- try {
- InputStream is = this.getClass().getResourceAsStream("/s4-comm-test.properties");
- config = new PropertiesConfiguration();
- config.load(is);
-
- System.out.println(ConfigurationUtils.toString(config));
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
- @Override
- protected void configure() {
- if (config == null)
- loadProperties(binder());
-
- int numHosts = config.getList("cluster.hosts").size();
- boolean isCluster = numHosts > 1 ? true : false;
- bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
-
- bind(Cluster.class);
-
- bind(Assignment.class).to(AssignmentFromFile.class);
-
- bind(Topology.class).to(TopologyFromFile.class);
-
- /* Use a simple UDP comm layer implementation. */
- bind(Listener.class).to(TCPListener.class);
- bind(Emitter.class).to(TCPEmitter.class);
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
-
- /* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
- bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(
- config.getInt("emitter.send.interval"));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
deleted file mode 100644
index 9d08193..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.s4.comm;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.name.Names;
-
-public class UDPCommTest extends SimpleDeliveryTest {
-
- @Override
- public void setup() {
- Injector injector = Guice.createInjector(new UDPCommTestModule());
- sdt = injector.getInstance(CommWrapper.class);
- }
-
- class UDPCommTestModule extends AbstractModule {
-
- protected PropertiesConfiguration config = null;
-
- private void loadProperties(Binder binder) {
-
- try {
- InputStream is = this.getClass().getResourceAsStream("/s4-comm-test.properties");
- config = new PropertiesConfiguration();
- config.load(is);
-
- System.out.println(ConfigurationUtils.toString(config));
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
- @Override
- protected void configure() {
- if (config == null)
- loadProperties(binder());
-
- int numHosts = config.getList("cluster.hosts").size();
- boolean isCluster = numHosts > 1 ? true : false;
- bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
-
- bind(Cluster.class);
-
- bind(Assignment.class).to(AssignmentFromFile.class);
-
- bind(Topology.class).to(TopologyFromFile.class);
-
- /* Use a simple UDP comm layer implementation. */
- bind(Listener.class).to(UDPListener.class);
- bind(Emitter.class).to(UDPEmitter.class);
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
-
- /* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
- bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(
- config.getInt("emitter.send.interval"));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
new file mode 100644
index 0000000..f02bf64
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
@@ -0,0 +1,69 @@
+package org.apache.s4.comm.tcp;
+
+import java.io.IOException;
+import org.apache.s4.comm.DeliveryTestUtil;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+
+public class TCPCommTest extends ZkBasedTest {
+ DeliveryTestUtil util;
+
+ @Before
+ public void setup() throws IOException, InterruptedException, KeeperException {
+ Injector injector = Guice.createInjector(new TCPCommTestModule());
+ util = injector.getInstance(DeliveryTestUtil.class);
+ }
+
+ class TCPCommTestModule extends ZkBasedClusterManagementTestModule {
+ TCPCommTestModule() {
+ super(TCPEmitter.class, TCPListener.class);
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
+ bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
+ }
+ }
+
+ /**
+ * Tests the protocol. If all components function without throwing exceptions, the test passes. The test also
+ * reports the loss of messages, if any.
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testTCPDelivery() throws InterruptedException {
+ try {
+ Thread sendThread = util.newSendThread();
+ Thread receiveThread = util.newReceiveThread();
+
+ // start send and receive threads
+ sendThread.start();
+ receiveThread.start();
+
+ // wait for them to finish
+ sendThread.join();
+ receiveThread.join();
+
+ Assert.assertTrue("Guaranteed message delivery", !util.moreMessages(receiveThread));
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("TCP has failed basic functionality test");
+ }
+
+ System.out.println("Done");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
index a82a490..d733e9b 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
@@ -19,6 +19,7 @@ public class AssignmentFromZKTest extends ZKBaseTest {
for (int i = 0; i < 10; i++) {
Runnable runnable = new Runnable() {
+ @SuppressWarnings("unused")
@Override
public void run() {
AssignmentFromZK assignmentFromZK;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
index 6c95784..eb5f42c 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
@@ -44,6 +44,7 @@ public class TopologyFromZKTest extends ZKBaseTest {
for (int i = 0; i < 10; i++) {
Runnable runnable = new Runnable() {
+ @SuppressWarnings("unused")
@Override
public void run() {
AssignmentFromZK assignmentFromZK;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
new file mode 100644
index 0000000..808a357
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
@@ -0,0 +1,65 @@
+package org.apache.s4.comm.udp;
+
+import java.io.IOException;
+
+import org.apache.s4.comm.DeliveryTestUtil;
+import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+
+public class UDPCommTest extends ZkBasedTest {
+ DeliveryTestUtil util;
+
+ @Before
+ public void setup() throws IOException, InterruptedException, KeeperException {
+ Injector injector = Guice.createInjector(new UDPCommTestModule());
+ util = injector.getInstance(DeliveryTestUtil.class);
+ }
+
+ class UDPCommTestModule extends ZkBasedClusterManagementTestModule {
+ UDPCommTestModule() {
+ super(UDPEmitter.class, UDPListener.class);
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
+ bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
+ }
+ }
+
+ /**
+ * Tests the protocol. If all components function without throwing exceptions, the test passes.
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testUDPDelivery() throws InterruptedException {
+ try {
+ Thread sendThread = util.newSendThread();
+ Thread receiveThread = util.newReceiveThread();
+
+ // start send and receive threads
+ sendThread.start();
+ receiveThread.start();
+
+ // wait for them to finish
+ sendThread.join();
+ receiveThread.join();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("UDP has failed basic functionality test");
+ }
+ System.out.println("Done");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
index b189350..ebcf388 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
@@ -16,7 +16,6 @@ import org.apache.s4.comm.DefaultHasher;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.Topology;
import org.apache.s4.comm.topology.TopologyFromFile;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
index be3739b..a368db8 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
@@ -1,8 +1,6 @@
package org.apache.s4.fixtures;
import java.io.InputStream;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
@@ -21,25 +19,25 @@ import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.Topology;
import org.apache.s4.comm.topology.TopologyFromZK;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
import com.google.inject.name.Names;
-// also uses netty
-public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
+public class ZkBasedClusterManagementTestModule extends AbstractModule {
protected PropertiesConfiguration config = null;
- private final Class<?> appClass;
+
+ private Class<? extends Emitter> emitterClass = null;
+ private Class<? extends Listener> listenerClass = null;
protected ZkBasedClusterManagementTestModule() {
- // infer actual app class through "super type tokens" (this simple code
- // assumes actual module class is a direct subclass from this one)
- ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
- Type[] fieldArgTypes = pt.getActualTypeArguments();
- this.appClass = (Class<?>) fieldArgTypes[0];
+ }
+
+ protected ZkBasedClusterManagementTestModule(Class<? extends Emitter> emitterClass,
+ Class<? extends Listener> listenerClass) {
+ this.emitterClass = emitterClass;
+ this.listenerClass = listenerClass;
}
private void loadProperties(Binder binder) {
@@ -48,8 +46,10 @@ public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
config = new PropertiesConfiguration();
config.load(is);
- config.setProperty("cluster.zk_address",
- config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+", "localhost:" + CommTestUtils.ZK_PORT));
+ config.setProperty(
+ "cluster.zk_address",
+ config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+",
+ "localhost:" + CommTestUtils.ZK_PORT));
System.out.println(ConfigurationUtils.toString(config));
// TODO - validate properties.
@@ -66,14 +66,22 @@ public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
if (config == null) {
loadProperties(binder());
}
- bind(appClass);
bind(Cluster.class);
bind(Hasher.class).to(DefaultHasher.class);
bind(SerializerDeserializer.class).to(KryoSerDeser.class);
bind(Assignment.class).to(AssignmentFromZK.class);
bind(Topology.class).to(TopologyFromZK.class);
- bind(Emitter.class).to(TCPEmitter.class);
- bind(Listener.class).to(TCPListener.class);
- }
+ if (this.emitterClass != null) {
+ bind(Emitter.class).to(this.emitterClass);
+ } else {
+ bind(Emitter.class).to(TCPEmitter.class);
+ }
+
+ if (this.listenerClass != null) {
+ bind(Listener.class).to(this.listenerClass);
+ } else {
+ bind(Listener.class).to(TCPListener.class);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
new file mode 100644
index 0000000..02c5467
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
@@ -0,0 +1,48 @@
+package org.apache.s4.fixtures;
+
+import java.io.File;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ZkBasedTest {
+ private static final Logger logger = LoggerFactory.getLogger(ZkBasedTest.class);
+ private ZkServer zkServer;
+
+ @Before
+ public void prepare() {
+ String dataDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
+ String logDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
+ CommTestUtils.cleanupTmpDirs();
+
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+
+ }
+ };
+
+ logger.info("Starting Zookeeper Server");
+ zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, CommTestUtils.ZK_PORT);
+ zkServer.start();
+
+ logger.info("Starting Zookeeper Client 1");
+ String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
+ @SuppressWarnings("unused")
+ ZkClient zkClient = new ZkClient(zookeeperAddress, 10000, 10000);
+
+ ZkClient zkClient2 = new ZkClient(zookeeperAddress, 10000, 10000);
+ zkClient2.getCreationTime("/");
+
+ TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
+ final String clusterName = "s4-test-cluster";
+ taskSetup.clean(clusterName);
+ taskSetup.setup(clusterName, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/resources/default.s4.properties b/subprojects/s4-comm/src/test/resources/default.s4.properties
new file mode 100644
index 0000000..0e31dfa
--- /dev/null
+++ b/subprojects/s4-comm/src/test/resources/default.s4.properties
@@ -0,0 +1,9 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+cluster.hosts = localhost
+cluster.ports = 5077
+cluster.lock_dir = {user.dir}/tmp
+cluster.name = s4-test-cluster
+cluster.zk_address = localhost:2181
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
new file mode 100644
index 0000000..cff6474
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
@@ -0,0 +1,35 @@
+package org.apache.s4.fixtures;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+
+public class ZkBasedAppModule<T> extends ZkBasedClusterManagementTestModule {
+ private final Class<?> appClass;
+
+ private Class<?> findAppClass() {
+ // infer actual app class through "super type tokens" (this simple code
+ // assumes actual module class is a direct subclass from this one)
+ ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
+ Type[] fieldArgTypes = pt.getActualTypeArguments();
+ return (Class<?>) fieldArgTypes[0];
+ }
+
+ protected ZkBasedAppModule() {
+ super();
+ this.appClass = findAppClass();
+ }
+
+ protected ZkBasedAppModule(Class<? extends Emitter> emitterClass, Class<? extends Listener> listenerClass) {
+ super(emitterClass, listenerClass);
+ this.appClass = findAppClass();
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+ bind(appClass);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
index 87814c5..322690a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
@@ -1,9 +1,7 @@
package org.apache.s4.wordcount.zk;
-import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
+import org.apache.s4.fixtures.ZkBasedAppModule;
import org.apache.s4.wordcount.WordCountApp;
-
-public class WordCountModuleZk extends ZkBasedClusterManagementTestModule<WordCountApp> {
-
+public class WordCountModuleZk extends ZkBasedAppModule<WordCountApp> {
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
index 3db216b..fb1d249 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
@@ -1,83 +1,22 @@
package org.apache.s4.wordcount.zk;
import static org.apache.s4.wordcount.WordCountTest.*;
-import static org.junit.Assert.*;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
-import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkServer;
-import org.apache.s4.comm.tools.TaskSetup;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.core.App;
import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.s4.wordcount.WordCountApp;
-import org.apache.s4.wordcount.WordCountModule;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class WordCountTestZk {
-
- private ZkServer zkServer;
- private ZkClient zkClient;
-
- @Before
- public void prepare() {
-
- String dataDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
- String logDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
- CommTestUtils.cleanupTmpDirs();
-
- IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
- @Override
- public void createDefaultNameSpace(ZkClient zkClient) {
-
- }
- };
-
- zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, CommTestUtils.ZK_PORT);
- zkServer.start();
-
- // zkClient = zkServer.getZkClient();
- String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
- zkClient = new ZkClient(zookeeperAddress, 10000, 10000);
-
- ZkClient zkClient2 = new ZkClient(zookeeperAddress, 10000, 10000);
- zkClient2.getCreationTime("/");
- TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
- final String clusterName = "s4-test-cluster";
- taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 1);
- // final CountDownLatch latch = new CountDownLatch(10);
- // for (int i = 0; i < 10; i++) {
- // Runnable runnable = new Runnable() {
- //
- // @Override
- // public void run() {
- // AssignmentFromZK assignmentFromZK;
- // try {
- // assignmentFromZK = new AssignmentFromZK(clusterName, zookeeperAddress, 30000, 30000);
- // ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
- // latch.countDown();
- // } catch (Exception e) {
- // e.printStackTrace();
- // }
- // }
- // };
- // Thread t = new Thread(runnable);
- // t.start();
- // }
- }
+import org.junit.Test;
+public class WordCountTestZk extends ZkBasedTest {
@Test
public void test() throws Exception {
@@ -99,6 +38,5 @@ public class WordCountTestZk {
File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
String s = CommTestUtils.readFile(results);
Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
-
}
}