You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 17:29:35 UTC

git commit: S4-36 : CommTests should use ZkBasedClusterManagement

Updated Branches:
  refs/heads/piper 111959da9 -> 25c379f2c


S4-36 : CommTests should use ZkBasedClusterManagement


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/25c379f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/25c379f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/25c379f2

Branch: refs/heads/piper
Commit: 25c379f2c29c509d6892146636cb124ad4dd53b7
Parents: 111959d
Author: Karthik Kambatla <kk...@cs.purdue.edu>
Authored: Mon Dec 26 19:24:13 2011 -0500
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Jan 3 18:19:53 2012 +0100

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/DeliveryTestUtil.java  |  143 ++++++++++++
 .../org/apache/s4/comm/SimpleDeliveryTest.java     |  174 ---------------
 .../test/java/org/apache/s4/comm/TCPCommTest.java  |   84 -------
 .../test/java/org/apache/s4/comm/UDPCommTest.java  |   84 -------
 .../java/org/apache/s4/comm/tcp/TCPCommTest.java   |   69 ++++++
 .../s4/comm/topology/AssignmentFromZKTest.java     |    1 +
 .../s4/comm/topology/TopologyFromZKTest.java       |    1 +
 .../java/org/apache/s4/comm/udp/UDPCommTest.java   |   65 ++++++
 .../FileBasedClusterManagementTestModule.java      |    1 -
 .../ZkBasedClusterManagementTestModule.java        |   44 +++--
 .../java/org/apache/s4/fixtures/ZkBasedTest.java   |   48 ++++
 .../src/test/resources/default.s4.properties       |    9 +
 .../org/apache/s4/fixtures/ZkBasedAppModule.java   |   35 +++
 .../apache/s4/wordcount/zk/WordCountModuleZk.java  |    6 +-
 .../apache/s4/wordcount/zk/WordCountTestZk.java    |   68 +------
 15 files changed, 402 insertions(+), 430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
new file mode 100644
index 0000000..19b4de1
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
@@ -0,0 +1,143 @@
+package org.apache.s4.comm;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/*
+ * Test util for communication protocols.
+ *
+ * <ul>
+ * <li> The util defines Send and Receive Threads </li>
+ * <li> SendThread sends out a pre-defined number of messages to all the partitions </li>
+ * <li> ReceiveThread receives all/most of these messages </li>
+ * <li> To avoid the receiveThread waiting for ever, it spawns a TimerThread that would 
+ * interrupt after a pre-defined but long enough interval </li>
+ * </ul>
+ * 
+ */
+public class DeliveryTestUtil {
+
+    private final Emitter emitter;
+    private final Listener listener;
+    private final int interval;
+    private int numMessages;
+    private int sleepCount;
+
+    // public Thread sendThread, receiveThread;
+    private final int messagesExpected;
+
+    @Inject
+    public DeliveryTestUtil(Emitter emitter, Listener listener, @Named("emitter.send.interval") int interval,
+            @Named("emitter.send.numMessages") int numMessages, @Named("listener.recv.sleepCount") int sleepCount) {
+        this.emitter = emitter;
+        this.listener = listener;
+        this.interval = interval;
+        this.numMessages = numMessages;
+        this.sleepCount = sleepCount;
+        this.messagesExpected = numMessages * this.emitter.getPartitionCount();
+
+        // this.sendThread = new SendThread();
+        // this.receiveThread = new ReceiveThread();
+    }
+
+    public class SendThread extends Thread {
+        @Override
+        public void run() {
+            try {
+                for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
+                    for (int i = 0; i < numMessages; i++) {
+                        byte[] message = (new String("message-" + i)).getBytes();
+                        emitter.send(partition, message);
+                        Thread.sleep(interval);
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                return;
+            }
+        }
+    }
+
+    /*
+     * TimerThread - interrupts the passed thread, after specified time-interval.
+     */
+    class TimerThread extends Thread {
+        private final Thread watchThread;
+        private Integer sleepCounter;
+
+        TimerThread(Thread watchThread) {
+            this.watchThread = watchThread;
+            this.sleepCounter = new Integer(sleepCount);
+        }
+
+        public void resetSleepCounter() {
+            synchronized (this.sleepCounter) {
+                this.sleepCounter = sleepCount;
+            }
+        }
+
+        public void clearSleepCounter() {
+            synchronized (this.sleepCounter) {
+                this.sleepCounter = 0;
+            }
+        }
+
+        private int getCounter() {
+            synchronized (this.sleepCounter) {
+                return this.sleepCounter--;
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (getCounter() > 0) {
+                    Thread.sleep(interval);
+                }
+                watchThread.interrupt();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    class ReceiveThread extends Thread {
+        private int messagesReceived = 0;
+
+        @Override
+        public void run() {
+
+            // start the timer thread to interrupt if blocked for too long
+            TimerThread timer = new TimerThread(this);
+            timer.start();
+            while (messagesReceived < messagesExpected) {
+                byte[] message = listener.recv();
+                timer.resetSleepCounter();
+                if (message != null)
+                    messagesReceived++;
+                else
+                    break;
+            }
+            timer.clearSleepCounter();
+        }
+
+        private boolean moreMessages() {
+            return ((messagesExpected - messagesReceived) > 0);
+        }
+    }
+
+    public Thread newSendThread() {
+        return new SendThread();
+    }
+
+    public Thread newReceiveThread() {
+        return new ReceiveThread();
+    }
+
+    public boolean moreMessages(Thread recvThread) {
+        return ((ReceiveThread) recvThread).moreMessages();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
deleted file mode 100644
index 5423361..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-package org.apache.s4.comm;
-
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Listener;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/*
- * Test class to test communication protocols. As comm-layer connections need to be
- *  made including acquiring locks, the test is declared abstract and needs to be 
- *  extended with appropriate protocols.
- * 
- * At a high-level, the test accomplishes the following:
- * <ul>
- * <li> Create Send and Receive Threads </li>
- * <li> SendThread sends out a pre-defined number of messages to all the partitions </li>
- * <li> ReceiveThread receives all/most of these messages </li>
- * <li> To avoid the receiveThread waiting for ever, it spawns a TimerThread that would 
- * interrupt after a pre-defined but long enough interval </li>
- * <li> The receive thread reports on number of messages received and dropped </li>
- * </ul>
- * 
- */
-public abstract class SimpleDeliveryTest {
-    protected CommWrapper sdt;
-    protected String lockdir;
-
-    static class CommWrapper {
-        private static final int MESSAGE_COUNT = 200;
-        private static final int TIMER_THREAD_COUNT = 100;
-
-        private final Emitter emitter;
-        private final Listener listener;
-        private final int interval;
-
-        public Thread sendThread, receiveThread;
-        private final int messagesExpected;
-        private int messagesReceived = 0;
-
-        @Inject
-        public CommWrapper(@Named("emitter.send.interval") int interval, Emitter emitter, Listener listener) {
-            this.emitter = emitter;
-            this.listener = listener;
-            this.interval = interval;
-            this.messagesExpected = MESSAGE_COUNT * this.emitter.getPartitionCount();
-
-            this.sendThread = new SendThread();
-            this.receiveThread = new ReceiveThread();
-        }
-
-        public boolean moreMessages() {
-            return ((messagesExpected - messagesReceived) > 0);
-        }
-
-        class SendThread extends Thread {
-            @Override
-            public void run() {
-                try {
-                    for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
-                        for (int i = 0; i < MESSAGE_COUNT; i++) {
-                            byte[] message = (new String("message-" + i)).getBytes();
-                            emitter.send(partition, message);
-                            Thread.sleep(interval);
-                        }
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    return;
-                }
-            }
-        }
-
-        /*
-         * TimerThread - interrupts the passed thread, after specified time-interval.
-         */
-        class TimerThread extends Thread {
-            private final Thread watchThread;
-            private Integer sleepCounter;
-
-            TimerThread(Thread watchThread) {
-                this.watchThread = watchThread;
-                this.sleepCounter = new Integer(TIMER_THREAD_COUNT);
-            }
-
-            public void resetSleepCounter() {
-                synchronized (this.sleepCounter) {
-                    this.sleepCounter = TIMER_THREAD_COUNT;
-                }
-            }
-
-            public void clearSleepCounter() {
-                synchronized (this.sleepCounter) {
-                    this.sleepCounter = 0;
-                }
-            }
-
-            private int getCounter() {
-                synchronized (this.sleepCounter) {
-                    return this.sleepCounter--;
-                }
-            }
-
-            @Override
-            public void run() {
-                try {
-                    while (getCounter() > 0) {
-                        Thread.sleep(interval);
-                    }
-                    watchThread.interrupt();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-
-        class ReceiveThread extends Thread {
-            @Override
-            public void run() {
-
-                // start the timer thread to interrupt if blocked for too long
-                TimerThread timer = new TimerThread(this);
-                timer.start();
-                while (messagesReceived < messagesExpected) {
-                    byte[] message = listener.recv();
-                    timer.resetSleepCounter();
-                    if (message != null)
-                        messagesReceived++;
-                    else
-                        break;
-                }
-                timer.clearSleepCounter();
-            }
-        }
-    }
-
-    /*
-     * All tests extending this class need to implement this method
-     */
-    @Before
-    public abstract void setup();
-
-    /**
-     * Tests the protocol. If all components function without throwing exceptions, the test passes. The test also
-     * reports the loss of messages, if any.
-     * 
-     * @throws InterruptedException
-     */
-    @Test
-    public void testCommLayerProtocol() throws InterruptedException {
-        try {
-            // start send and receive threads
-            sdt.sendThread.start();
-            sdt.receiveThread.start();
-
-            // wait for them to finish
-            sdt.sendThread.join();
-            sdt.receiveThread.join();
-
-            Assert.assertTrue("Guaranteed message delivery", !sdt.moreMessages());
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail("The comm protocol has failed basic functionality test");
-        }
-
-        Assert.assertTrue("The comm protocol seems to be working crash-free", true);
-
-        System.out.println("Done");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
deleted file mode 100644
index b2ee5e1..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/TCPCommTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.s4.comm;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.name.Names;
-
-public class TCPCommTest extends SimpleDeliveryTest {
-
-    @Override
-    public void setup() {
-        Injector injector = Guice.createInjector(new NettyTestModule());
-        sdt = injector.getInstance(CommWrapper.class);
-    }
-
-    class NettyTestModule extends AbstractModule {
-
-        protected PropertiesConfiguration config = null;
-
-        private void loadProperties(Binder binder) {
-
-            try {
-                InputStream is = this.getClass().getResourceAsStream("/s4-comm-test.properties");
-                config = new PropertiesConfiguration();
-                config.load(is);
-
-                System.out.println(ConfigurationUtils.toString(config));
-                Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-            } catch (ConfigurationException e) {
-                binder.addError(e);
-                e.printStackTrace();
-            }
-        }
-
-        @Override
-        protected void configure() {
-            if (config == null)
-                loadProperties(binder());
-
-            int numHosts = config.getList("cluster.hosts").size();
-            boolean isCluster = numHosts > 1 ? true : false;
-            bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
-
-            bind(Cluster.class);
-
-            bind(Assignment.class).to(AssignmentFromFile.class);
-
-            bind(Topology.class).to(TopologyFromFile.class);
-
-            /* Use a simple UDP comm layer implementation. */
-            bind(Listener.class).to(TCPListener.class);
-            bind(Emitter.class).to(TCPEmitter.class);
-
-            /* The hashing function to map keys top partitions. */
-            bind(Hasher.class).to(DefaultHasher.class);
-
-            /* Use Kryo to serialize events. */
-            bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
-            bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(
-                    config.getInt("emitter.send.interval"));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
deleted file mode 100644
index 9d08193..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPCommTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.s4.comm;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.name.Names;
-
-public class UDPCommTest extends SimpleDeliveryTest {
-
-    @Override
-    public void setup() {
-        Injector injector = Guice.createInjector(new UDPCommTestModule());
-        sdt = injector.getInstance(CommWrapper.class);
-    }
-
-    class UDPCommTestModule extends AbstractModule {
-
-        protected PropertiesConfiguration config = null;
-
-        private void loadProperties(Binder binder) {
-
-            try {
-                InputStream is = this.getClass().getResourceAsStream("/s4-comm-test.properties");
-                config = new PropertiesConfiguration();
-                config.load(is);
-
-                System.out.println(ConfigurationUtils.toString(config));
-                Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-            } catch (ConfigurationException e) {
-                binder.addError(e);
-                e.printStackTrace();
-            }
-        }
-
-        @Override
-        protected void configure() {
-            if (config == null)
-                loadProperties(binder());
-
-            int numHosts = config.getList("cluster.hosts").size();
-            boolean isCluster = numHosts > 1 ? true : false;
-            bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
-
-            bind(Cluster.class);
-
-            bind(Assignment.class).to(AssignmentFromFile.class);
-
-            bind(Topology.class).to(TopologyFromFile.class);
-
-            /* Use a simple UDP comm layer implementation. */
-            bind(Listener.class).to(UDPListener.class);
-            bind(Emitter.class).to(UDPEmitter.class);
-
-            /* The hashing function to map keys top partitions. */
-            bind(Hasher.class).to(DefaultHasher.class);
-
-            /* Use Kryo to serialize events. */
-            bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
-            bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(
-                    config.getInt("emitter.send.interval"));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
new file mode 100644
index 0000000..f02bf64
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
@@ -0,0 +1,69 @@
+package org.apache.s4.comm.tcp;
+
+import java.io.IOException;
+import org.apache.s4.comm.DeliveryTestUtil;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+
+public class TCPCommTest extends ZkBasedTest {
+    DeliveryTestUtil util;
+
+    @Before
+    public void setup() throws IOException, InterruptedException, KeeperException {
+        Injector injector = Guice.createInjector(new TCPCommTestModule());
+        util = injector.getInstance(DeliveryTestUtil.class);
+    }
+
+    class TCPCommTestModule extends ZkBasedClusterManagementTestModule {
+        TCPCommTestModule() {
+            super(TCPEmitter.class, TCPListener.class);
+        }
+
+        @Override
+        protected void configure() {
+            super.configure();
+            bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
+            bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
+            bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
+        }
+    }
+
+    /**
+     * Tests the protocol. If all components function without throwing exceptions, the test passes. The test also
+     * reports the loss of messages, if any.
+     * 
+     * @throws InterruptedException
+     */
+    @Test
+    public void testTCPDelivery() throws InterruptedException {
+        try {
+            Thread sendThread = util.newSendThread();
+            Thread receiveThread = util.newReceiveThread();
+
+            // start send and receive threads
+            sendThread.start();
+            receiveThread.start();
+
+            // wait for them to finish
+            sendThread.join();
+            receiveThread.join();
+
+            Assert.assertTrue("Guaranteed message delivery", !util.moreMessages(receiveThread));
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail("TCP has failed basic functionality test");
+        }
+
+        System.out.println("Done");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
index a82a490..d733e9b 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
@@ -19,6 +19,7 @@ public class AssignmentFromZKTest extends ZKBaseTest {
         for (int i = 0; i < 10; i++) {
             Runnable runnable = new Runnable() {
 
+                @SuppressWarnings("unused")
                 @Override
                 public void run() {
                     AssignmentFromZK assignmentFromZK;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
index 6c95784..eb5f42c 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
@@ -44,6 +44,7 @@ public class TopologyFromZKTest extends ZKBaseTest {
         for (int i = 0; i < 10; i++) {
             Runnable runnable = new Runnable() {
 
+                @SuppressWarnings("unused")
                 @Override
                 public void run() {
                     AssignmentFromZK assignmentFromZK;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
new file mode 100644
index 0000000..808a357
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
@@ -0,0 +1,65 @@
+package org.apache.s4.comm.udp;
+
+import java.io.IOException;
+
+import org.apache.s4.comm.DeliveryTestUtil;
+import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+
+public class UDPCommTest extends ZkBasedTest {
+    DeliveryTestUtil util;
+
+    @Before
+    public void setup() throws IOException, InterruptedException, KeeperException {
+        Injector injector = Guice.createInjector(new UDPCommTestModule());
+        util = injector.getInstance(DeliveryTestUtil.class);
+    }
+
+    class UDPCommTestModule extends ZkBasedClusterManagementTestModule {
+        UDPCommTestModule() {
+            super(UDPEmitter.class, UDPListener.class);
+        }
+
+        @Override
+        protected void configure() {
+            super.configure();
+            bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
+            bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
+            bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
+        }
+    }
+
+    /**
+     * Tests the protocol. If all components function without throwing exceptions, the test passes.
+     * 
+     * @throws InterruptedException
+     */
+    @Test
+    public void testUDPDelivery() throws InterruptedException {
+        try {
+            Thread sendThread = util.newSendThread();
+            Thread receiveThread = util.newReceiveThread();
+
+            // start send and receive threads
+            sendThread.start();
+            receiveThread.start();
+
+            // wait for them to finish
+            sendThread.join();
+            receiveThread.join();
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail("UDP has failed basic functionality test");
+        }
+        System.out.println("Done");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
index b189350..ebcf388 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
@@ -16,7 +16,6 @@ import org.apache.s4.comm.DefaultHasher;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.AssignmentFromZK;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.Topology;
 import org.apache.s4.comm.topology.TopologyFromFile;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
index be3739b..a368db8 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
@@ -1,8 +1,6 @@
 package org.apache.s4.fixtures;
 
 import java.io.InputStream;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
 
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.configuration.ConfigurationException;
@@ -21,25 +19,25 @@ import org.apache.s4.comm.topology.AssignmentFromZK;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.Topology;
 import org.apache.s4.comm.topology.TopologyFromZK;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
 import com.google.inject.name.Names;
 
-// also uses netty
-public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
+public class ZkBasedClusterManagementTestModule extends AbstractModule {
 
     protected PropertiesConfiguration config = null;
-    private final Class<?> appClass;
+
+    private Class<? extends Emitter> emitterClass = null;
+    private Class<? extends Listener> listenerClass = null;
 
     protected ZkBasedClusterManagementTestModule() {
-        // infer actual app class through "super type tokens" (this simple code
-        // assumes actual module class is a direct subclass from this one)
-        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
-        Type[] fieldArgTypes = pt.getActualTypeArguments();
-        this.appClass = (Class<?>) fieldArgTypes[0];
+    }
+
+    protected ZkBasedClusterManagementTestModule(Class<? extends Emitter> emitterClass,
+            Class<? extends Listener> listenerClass) {
+        this.emitterClass = emitterClass;
+        this.listenerClass = listenerClass;
     }
 
     private void loadProperties(Binder binder) {
@@ -48,8 +46,10 @@ public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
             InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
             config = new PropertiesConfiguration();
             config.load(is);
-            config.setProperty("cluster.zk_address",
-                    config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+", "localhost:" + CommTestUtils.ZK_PORT));
+            config.setProperty(
+                    "cluster.zk_address",
+                    config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+",
+                            "localhost:" + CommTestUtils.ZK_PORT));
             System.out.println(ConfigurationUtils.toString(config));
             // TODO - validate properties.
 
@@ -66,14 +66,22 @@ public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
         if (config == null) {
             loadProperties(binder());
         }
-        bind(appClass);
         bind(Cluster.class);
         bind(Hasher.class).to(DefaultHasher.class);
         bind(SerializerDeserializer.class).to(KryoSerDeser.class);
         bind(Assignment.class).to(AssignmentFromZK.class);
         bind(Topology.class).to(TopologyFromZK.class);
-        bind(Emitter.class).to(TCPEmitter.class);
-        bind(Listener.class).to(TCPListener.class);
-    }
 
+        if (this.emitterClass != null) {
+            bind(Emitter.class).to(this.emitterClass);
+        } else {
+            bind(Emitter.class).to(TCPEmitter.class);
+        }
+
+        if (this.listenerClass != null) {
+            bind(Listener.class).to(this.listenerClass);
+        } else {
+            bind(Listener.class).to(TCPListener.class);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
new file mode 100644
index 0000000..02c5467
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
@@ -0,0 +1,48 @@
+package org.apache.s4.fixtures;
+
+import java.io.File;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ZkBasedTest {
+    private static final Logger logger = LoggerFactory.getLogger(ZkBasedTest.class);
+    private ZkServer zkServer;
+
+    @Before
+    public void prepare() {
+        String dataDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
+        String logDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
+        CommTestUtils.cleanupTmpDirs();
+
+        IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+
+            @Override
+            public void createDefaultNameSpace(ZkClient zkClient) {
+
+            }
+        };
+
+        logger.info("Starting Zookeeper Server");
+        zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, CommTestUtils.ZK_PORT);
+        zkServer.start();
+
+        logger.info("Starting Zookeeper Client 1");
+        String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
+        @SuppressWarnings("unused")
+        ZkClient zkClient = new ZkClient(zookeeperAddress, 10000, 10000);
+
+        ZkClient zkClient2 = new ZkClient(zookeeperAddress, 10000, 10000);
+        zkClient2.getCreationTime("/");
+
+        TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
+        final String clusterName = "s4-test-cluster";
+        taskSetup.clean(clusterName);
+        taskSetup.setup(clusterName, 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-comm/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/resources/default.s4.properties b/subprojects/s4-comm/src/test/resources/default.s4.properties
new file mode 100644
index 0000000..0e31dfa
--- /dev/null
+++ b/subprojects/s4-comm/src/test/resources/default.s4.properties
@@ -0,0 +1,9 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+cluster.hosts = localhost
+cluster.ports = 5077
+cluster.lock_dir = {user.dir}/tmp
+cluster.name = s4-test-cluster
+cluster.zk_address = localhost:2181
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
new file mode 100644
index 0000000..cff6474
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
@@ -0,0 +1,35 @@
+package org.apache.s4.fixtures;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+
+public class ZkBasedAppModule<T> extends ZkBasedClusterManagementTestModule {
+    private final Class<?> appClass;
+
+    private Class<?> findAppClass() {
+        // infer actual app class through "super type tokens" (this simple code
+        // assumes actual module class is a direct subclass from this one)
+        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
+        Type[] fieldArgTypes = pt.getActualTypeArguments();
+        return (Class<?>) fieldArgTypes[0];
+    }
+
+    protected ZkBasedAppModule() {
+        super();
+        this.appClass = findAppClass();
+    }
+
+    protected ZkBasedAppModule(Class<? extends Emitter> emitterClass, Class<? extends Listener> listenerClass) {
+        super(emitterClass, listenerClass);
+        this.appClass = findAppClass();
+    }
+
+    @Override
+    protected void configure() {
+        super.configure();
+        bind(appClass);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
index 87814c5..322690a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
@@ -1,9 +1,7 @@
 package org.apache.s4.wordcount.zk;
 
-import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
+import org.apache.s4.fixtures.ZkBasedAppModule;
 import org.apache.s4.wordcount.WordCountApp;
 
-
-public class WordCountModuleZk extends ZkBasedClusterManagementTestModule<WordCountApp> {
-
+public class WordCountModuleZk extends ZkBasedAppModule<WordCountApp> {
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/25c379f2/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
index 3db216b..fb1d249 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
@@ -1,83 +1,22 @@
 package org.apache.s4.wordcount.zk;
 
 import static org.apache.s4.wordcount.WordCountTest.*;
-import static org.junit.Assert.*;
 import java.io.File;
 import java.util.concurrent.CountDownLatch;
 
 import junit.framework.Assert;
 
-import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkServer;
-import org.apache.s4.comm.tools.TaskSetup;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.core.App;
 import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
 import org.apache.s4.wordcount.WordCountApp;
-import org.apache.s4.wordcount.WordCountModule;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class WordCountTestZk {
-
-    private ZkServer zkServer;
-    private ZkClient zkClient;
-
-    @Before
-    public void prepare() {
-
-        String dataDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
-        String logDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
-        CommTestUtils.cleanupTmpDirs();
-
-        IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
 
-            @Override
-            public void createDefaultNameSpace(ZkClient zkClient) {
-
-            }
-        };
-
-        zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, CommTestUtils.ZK_PORT);
-        zkServer.start();
-
-        // zkClient = zkServer.getZkClient();
-        String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
-        zkClient = new ZkClient(zookeeperAddress, 10000, 10000);
-
-        ZkClient zkClient2 = new ZkClient(zookeeperAddress, 10000, 10000);
-        zkClient2.getCreationTime("/");
-        TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
-        final String clusterName = "s4-test-cluster";
-        taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 1);
-        // final CountDownLatch latch = new CountDownLatch(10);
-        // for (int i = 0; i < 10; i++) {
-        // Runnable runnable = new Runnable() {
-        //
-        // @Override
-        // public void run() {
-        // AssignmentFromZK assignmentFromZK;
-        // try {
-        // assignmentFromZK = new AssignmentFromZK(clusterName, zookeeperAddress, 30000, 30000);
-        // ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
-        // latch.countDown();
-        // } catch (Exception e) {
-        // e.printStackTrace();
-        // }
-        // }
-        // };
-        // Thread t = new Thread(runnable);
-        // t.start();
-        // }
-    }
+import org.junit.Test;
 
+public class WordCountTestZk extends ZkBasedTest {
     @Test
     public void test() throws Exception {
 
@@ -99,6 +38,5 @@ public class WordCountTestZk {
         File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
         String s = CommTestUtils.readFile(results);
         Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
-
     }
 }