You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC

[1/22] git commit: Merge branch 'S4-22' into piper

Updated Branches:
  refs/heads/piper f296947a8 -> 121f33066


Merge branch 'S4-22' into piper


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/121f3306
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/121f3306
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/121f3306

Branch: refs/heads/piper
Commit: 121f33066dc4bf02bd37c618312c97874c6c73c4
Parents: f296947 0338484
Author: Matthieu Morel <mm...@apache.org>
Authored: Fri Jun 15 17:52:49 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Fri Jun 15 17:52:49 2012 +0200

----------------------------------------------------------------------
 build.gradle                                       |   63 ++---
 gradle/wrapper/gradle-wrapper.properties           |    2 +-
 gradlew                                            |    7 +-
 gradlew.bat                                        |    2 +-
 lib/gradle-base-services-1.0-rc-3.jar              |  Bin 0 -> 31974 bytes
 lib/gradle-core-1.0-rc-3.jar                       |  Bin 0 -> 1643485 bytes
 lib/gradle-tooling-api-1.0-rc-3.jar                |  Bin 0 -> 122911 bytes
 lib/gradle-wrapper-1.0-rc-3.jar                    |  Bin 0 -> 39860 bytes
 lib/gradle-wrapper-1.0-rc-3.properties             |    6 +
 s4                                                 |   24 ++
 settings.gradle                                    |    5 +
 .../src/main/java/org/apache/s4/base/Emitter.java  |    2 +-
 .../src/main/java/org/apache/s4/base/Event.java    |   22 +-
 .../main/java/org/apache/s4/base/EventMessage.java |   52 +++
 .../java/org/apache/s4/base/RemoteEmitter.java     |    5 +
 .../org/apache/s4/base/util/MultiClassLoader.java  |   31 +-
 subprojects/s4-comm/s4-comm.gradle                 |    4 +-
 .../java/org/apache/s4/comm/DefaultCommModule.java |  129 ++++++++
 .../src/main/java/org/apache/s4/comm/Module.java   |  111 -------
 .../java/org/apache/s4/comm/QueueingEmitter.java   |   12 +-
 .../org/apache/s4/comm/RemoteEmitterFactory.java   |   10 +
 .../apache/s4/comm/loopback/LoopBackEmitter.java   |   12 +-
 .../org/apache/s4/comm/serialize/KryoSerDeser.java |   49 ++--
 .../org/apache/s4/comm/tcp/RemoteEmitters.java     |   34 ++
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |   48 +++-
 .../java/org/apache/s4/comm/tcp/TCPListener.java   |   12 +
 .../org/apache/s4/comm/tcp/TCPRemoteEmitter.java   |   27 ++
 .../java/org/apache/s4/comm/tools/TaskSetup.java   |   54 ++--
 .../org/apache/s4/comm/topology/Assignment.java    |    8 +-
 .../s4/comm/topology/AssignmentFromFile.java       |  127 --------
 .../apache/s4/comm/topology/AssignmentFromZK.java  |   45 ++-
 .../java/org/apache/s4/comm/topology/Cluster.java  |  139 +--------
 .../s4/comm/topology/ClusterChangeListener.java    |   10 +
 .../org/apache/s4/comm/topology/ClusterFromZK.java |  198 ++++++++++++
 .../java/org/apache/s4/comm/topology/Clusters.java |   12 +
 .../apache/s4/comm/topology/ClustersFromZK.java    |  103 ++++++
 .../apache/s4/comm/topology/PhysicalCluster.java   |  125 ++++++++
 .../org/apache/s4/comm/topology/RemoteCluster.java |   10 +
 .../org/apache/s4/comm/topology/RemoteStreams.java |  219 +++++++++++++
 .../apache/s4/comm/topology/StreamConsumer.java    |   55 ++++
 .../java/org/apache/s4/comm/topology/Topology.java |    9 -
 .../s4/comm/topology/TopologyChangeListener.java   |    5 -
 .../apache/s4/comm/topology/TopologyFromFile.java  |   33 --
 .../apache/s4/comm/topology/TopologyFromZK.java    |  144 ---------
 .../java/org/apache/s4/comm/topology/ZNRecord.java |   17 +-
 .../java/org/apache/s4/comm/udp/UDPEmitter.java    |   28 +-
 .../org/apache/s4/comm/udp/UDPRemoteEmitter.java   |   18 +
 .../src/main/resources/default.s4.comm.properties  |   12 +
 .../s4-comm/src/main/resources/s4-comm.properties  |    9 -
 .../java/org/apache/s4/comm/DeliveryTestUtil.java  |  141 +++++++++
 .../s4/comm/tcp/MultiPartitionDeliveryTest.java    |   10 +-
 .../org/apache/s4/comm/tcp/NetworkGlitchTest.java  |   11 +-
 .../org/apache/s4/comm/tcp/SimpleDeliveryTest.java |   10 +-
 .../java/org/apache/s4/comm/tcp/TCPBasedTest.java  |   48 ---
 .../java/org/apache/s4/comm/tcp/TCPCommTest.java   |   64 ++++
 .../s4/comm/topology/AssignmentFromZKTest.java     |   42 ---
 .../s4/comm/topology/AssignmentsFromZKTest.java    |   69 ++++
 .../s4/comm/topology/ClustersFromZKTest.java       |   97 ++++++
 .../s4/comm/topology/TopologyFromZKTest.java       |   78 -----
 .../org/apache/s4/comm/topology/ZKBaseTest.java    |   37 +--
 .../s4/comm/udp/MultiPartitionDeliveryTest.java    |    6 +-
 .../org/apache/s4/comm/udp/SimpleDeliveryTest.java |    6 +-
 .../java/org/apache/s4/comm/udp/UDPBasedTest.java  |   50 ---
 .../java/org/apache/s4/comm/udp/UDPCommTest.java   |   65 ++++
 .../org/apache/s4/comm/util/PartitionInfo.java     |   11 +-
 .../org/apache/s4/comm/util/ProtocolTestUtil.java  |    7 +-
 .../java/org/apache/s4/fixtures/CommTestUtils.java |    6 +-
 .../FileBasedClusterManagementTestModule.java      |   81 -----
 .../ZkBasedClusterManagementTestModule.java        |   92 ------
 .../java/org/apache/s4/fixtures/ZkBasedTest.java   |   39 +--
 .../src/test/resources/default.s4.properties       |   13 -
 .../src/test/resources/s4-comm-test.properties     |   10 -
 .../src/test/resources/udp.s4.comm.properties      |   12 +
 subprojects/s4-core/s4-core.gradle                 |   25 +-
 .../src/main/java/org/apache/s4/core/App.java      |  240 ++++++++------
 .../main/java/org/apache/s4/core/CustomModule.java |  103 ------
 .../java/org/apache/s4/core/DefaultCoreModule.java |   76 +++++
 .../main/java/org/apache/s4/core/EventSource.java  |   58 ++--
 .../src/main/java/org/apache/s4/core/Key.java      |   52 +++
 .../src/main/java/org/apache/s4/core/Main.java     |  194 +++++++-----
 .../java/org/apache/s4/core/ProcessingElement.java |  129 ++++----
 .../src/main/java/org/apache/s4/core/Receiver.java |   24 +-
 .../main/java/org/apache/s4/core/RemoteSender.java |   28 ++
 .../java/org/apache/s4/core/RemoteSenders.java     |   62 ++++
 .../main/java/org/apache/s4/core/RemoteStream.java |   77 +++++
 .../src/main/java/org/apache/s4/core/Sender.java   |   18 +-
 .../src/main/java/org/apache/s4/core/Server.java   |   84 +++--
 .../src/main/java/org/apache/s4/core/Stream.java   |  133 ++++-----
 .../main/java/org/apache/s4/core/Streamable.java   |   18 +-
 .../main/java/org/apache/s4/core/WindowingPE.java  |   23 +-
 .../org/apache/s4/core/adapter/AdapterApp.java     |   52 +++
 .../s4/core/util/ParametersInjectionModule.java    |   28 ++
 .../s4/deploy/DistributedDeploymentManager.java    |    6 +-
 .../src/main/resources/apps/CounterExample.s4r     |  Bin 53071 -> 0 bytes
 .../src/main/resources/default.s4.core.properties  |    2 +
 .../s4-core/src/main/resources/s4-core.properties  |    5 -
 .../test/java/org/apache/s4/core/TriggerTest.java  |   20 +-
 .../apache/s4/core/triggers/TriggeredModule.java   |    6 -
 .../apache/s4/deploy/TestAutomaticDeployment.java  |  116 +++-----
 .../test/java/org/apache/s4/deploy/TestModule.java |    5 +-
 .../s4/deploy/prodcon/TestProducerConsumer.java    |  165 ++++++++++
 .../java/org/apache/s4/fixtures/CoreTestUtils.java |   85 +++---
 .../java/org/apache/s4/fixtures/SocketAdapter.java |   27 +-
 .../org/apache/s4/fixtures/ZkBasedAppModule.java   |   35 --
 .../org/apache/s4/wordcount/WordClassifierPE.java  |   19 +-
 .../org/apache/s4/wordcount/WordCountModule.java   |   10 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |   10 +-
 .../apache/s4/wordcount/zk/WordCountModuleZk.java  |    7 -
 .../apache/s4/wordcount/zk/WordCountTestZk.java    |   46 ---
 .../src/test/resources/default.s4.properties       |   10 -
 .../resources/org.apache.s4.deploy.s4.properties   |   15 -
 .../test/resources/s4-counter-example.properties   |    7 -
 subprojects/s4-edsl/s4-edsl.gradle                 |    2 +-
 .../main/java/org/apache/s4/edsl/AppBuilder.java   |    4 +-
 .../src/test/java/org/apache/s4/edsl/Module.java   |  105 -------
 .../src/test/java/org/apache/s4/edsl/MyApp.java    |   13 -
 .../src/test/java/org/apache/s4/edsl/TestEDSL.java |   12 +-
 .../java/org/apache/s4/example/counter/Module.java |   81 ++---
 .../java/org/apache/s4/example/counter/MyApp.java  |   46 +--
 .../apache/s4/example/edsl/counter/CounterApp.java |  199 ++++++------
 .../org/apache/s4/example/edsl/counter/Module.java |  105 -------
 .../java/org/apache/s4/example/model/Main.java     |   18 +-
 .../java/org/apache/s4/example/model/MyApp.java    |   12 +-
 subprojects/s4-tools/s4-tools.gradle               |   73 +++++
 .../main/java/org/apache/s4/tools/CreateApp.java   |  146 +++++++++
 .../java/org/apache/s4/tools/DefineCluster.java    |   48 +++
 .../src/main/java/org/apache/s4/tools/Deploy.java  |  151 +++++++++
 .../org/apache/s4/tools/FileExistsValidator.java   |   18 +
 .../src/main/java/org/apache/s4/tools/Package.java |   44 +++
 .../main/java/org/apache/s4/tools/S4ArgsBase.java  |   33 ++
 .../src/main/java/org/apache/s4/tools/Tools.java   |  113 +++++++
 .../main/java/org/apache/s4/tools/ZKServer.java    |   66 ++++
 .../src/main/resources/templates/HelloApp.java.txt |   34 ++
 .../resources/templates/HelloInputAdapter.java.txt |   60 ++++
 .../src/main/resources/templates/HelloPE.java.txt  |   29 ++
 .../src/main/resources/templates/build.gradle      |  175 +++++++++++
 .../s4-tools/src/main/resources/templates/gradlew  |    2 +
 .../src/main/resources/templates/newApp.README     |   37 +++
 .../s4-tools/src/main/resources/templates/s4       |   17 +
 .../src/main/resources/templates/settings.gradle   |    1 +
 test-apps/s4-counter/build.gradle                  |  224 ++++++++++++++
 .../s4-counter/src/main/java/s4app/ClockApp.java   |   38 +++
 .../s4-counter/src/main/java/s4app/ClockPE.java    |   49 +++
 test-apps/s4-showtime/build.gradle                 |  222 +++++++++++++
 .../s4-showtime/src/main/java/s4app/ShowPE.java    |   33 ++
 .../src/main/java/s4app/ShowTimeApp.java           |   29 ++
 test-apps/simple-deployable-app-1/build.gradle     |   33 +-
 .../main/java/org/apache/s4/deploy/SimplePE.java   |    2 +-
 .../main/java/org/apache/s4/deploy/TestApp.java    |    2 +-
 test-apps/simple-deployable-app-2/build.gradle     |   33 +-
 .../main/java/org/apache/s4/deploy/TestApp.java    |    2 +-
 test-apps/twitter-adapter/README.txt               |    1 +
 test-apps/twitter-adapter/build.gradle             |  209 +++++++++++++
 .../s4/example/twitter/TwitterInputAdapter.java    |  120 +++++++
 .../src/main/resources/s4.properties               |   18 +
 test-apps/twitter-counter/README.txt               |   33 ++
 test-apps/twitter-counter/build.gradle             |  201 ++++++++++++
 .../org/apache/s4/example/twitter/TopNTopicPE.java |   96 ++++++
 .../s4/example/twitter/TopicCountAndReportPE.java  |   56 ++++
 .../org/apache/s4/example/twitter/TopicEvent.java  |   30 ++
 .../s4/example/twitter/TopicExtractorPE.java       |   59 ++++
 .../s4/example/twitter/TwitterCounterApp.java      |   72 +++++
 .../src/main/resources/default.s4.properties       |   19 ++
 163 files changed, 5759 insertions(+), 2540 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/build.gradle
----------------------------------------------------------------------
diff --cc build.gradle
index 818b968,4639a8a..54c70ee
--- a/build.gradle
+++ b/build.gradle
@@@ -142,21 -154,21 +144,7 @@@ dependencies 
      platformLibs platformProjects
  }
  
--//task xxx {
--//    platformProjects.collect {proj ->
--//        // Iterate over the source sets
--//        println '\n' + proj.name + ' ' + proj.sourceSets.main.name
--//        println proj.sourceSets.main.classesDir
--//        println proj.sourceSets.main.runtimeClasspath.getFiles()
--//        println proj.sourceSets.main.resources.getFiles()
--//        println proj.sourceSets.main.allJava.getFiles()
--//        println proj.configurations.runtime.getFiles()
--//        println ' '
--//        println proj.configurations.archives.allArtifactFiles.getFiles()
--//    }
--//}
--
- binDistImage = copySpec {
+ project.ext["binDistImage"] = copySpec {
      platformProjects.collect {proj ->
          into ("platform/lib") {
              from proj.sourceSets.main.resources

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/gradle/wrapper/gradle-wrapper.properties
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/settings.gradle
----------------------------------------------------------------------
diff --cc settings.gradle
index 71b1207,33827ba..ec6cb7d
--- a/settings.gradle
+++ b/settings.gradle
@@@ -16,10 -16,14 +16,15 @@@
  include 's4-base'
  include 's4-core'
  include 's4-comm'
 +include 's4-edsl'
  include 's4-example'
+ include 's4-tools'
+ //include 's4-example'
+ //include ':test-apps:simple-adapter-1'
  include ':test-apps:simple-deployable-app-1'
  include ':test-apps:simple-deployable-app-2'
+ include ':test-apps:s4-showtime'
+ include ':test-apps:s4-counter'
  
  rootProject.name = 's4'
  rootProject.children.each {project ->

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
----------------------------------------------------------------------
diff --cc subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
index ad511c9,08a291d..54c08df
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@@ -9,9 -9,7 +9,9 @@@ public interface Emitter 
       * 
       * @return - true - if message is sent across successfully - false - if send fails
       */
-     boolean send(int partitionId, byte[] message);
+     boolean send(int partitionId, EventMessage message);
  
      int getPartitionCount();
 +
 +    void close();
  }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/s4-comm.gradle
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
index 7218afe,7fd6773..3dd2955
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
@@@ -94,10 -96,4 +95,9 @@@ public class QueueingEmitter implement
          }
      }
  
 +    @Override
 +    public void close() {
 +        // TODO Auto-generated method stub
 +
 +    }
- 
  }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 171315a,7bff4af..5caafc9
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@@ -36,303 -32,119 +38,309 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import com.google.common.collect.HashBiMap;
 +import com.google.common.util.concurrent.ThreadFactoryBuilder;
  import com.google.inject.Inject;
 +import com.google.inject.name.Named;
  
  /**
 + * <p>
 + * TCPEmitter - Uses TCP to send messages across partitions. It
 + * <ul>
 + * <li>guarantees message delivery</li>
 + * <li>preserves pair-wise message ordering; might end up sending duplicates to ensure the order</li>
 + * <li>tolerates topology changes, partition re-mapping and network glitches</li>
 + * </ul>
 + * </p>
   * 
 - * Sends messages through TCP, to the associated subcluster.
 - * 
 + * <p>
 + * TCPEmitter is designed as follows:
 + * <ul>
 + * <li>maintains per-partition queue of {@code Message}s</li>
 + * <li> <code>send(p, m)</code> queues the message 'm' to partition 'p'</li>
 + * <li>a thread-pool is used to send the messages asynchronously to the appropriate partitions; send operations between
 + * a pair of partitions are serialized</li>
 + * <li>Each {@code Message} implements the {@link ChannelFutureListener} and listens on the {@link ChannelFuture}
 + * corresponding to the send operation</li>
 + * <li>On success, the message marks itself as sent; messages marked sent at the head of the queue are removed</li>
 + * <li>On failure of a message m, 'm' and all the messages queued after 'm' are resent to preserve message ordering</li>
 + * </ul>
 + * </p>
   */
 -public class TCPEmitter implements Emitter, ChannelFutureListener, ClusterChangeListener {
 +
- public class TCPEmitter implements Emitter, TopologyChangeListener {
++public class TCPEmitter implements Emitter, ClusterChangeListener {
      private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
 -    private static final int BUFFER_SIZE = 10;
 -    private static final int NUM_RETRIES = 10;
 +
 +    private final int numRetries;
 +    private final int retryDelayMs;
 +    private final int nettyTimeout;
 +    private final int bufferCapacity;
  
-     private Topology topology;
+     private Cluster topology;
      private final ClientBootstrap bootstrap;
  
 +    /*
 +     * debug information
 +     */
 +    private volatile int instanceId = 0;
 +
 +    /*
 +     * All channels
 +     */
 +    private final ChannelGroup channels = new DefaultChannelGroup();
 +
 +    /*
 +     * Channel used to send messages to each partition
 +     */
 +    private final HashBiMap<Integer, Channel> partitionChannelMap;
 +
 +    /*
 +     * Node hosting each partition
 +     */
 +    private final HashBiMap<Integer, ClusterNode> partitionNodeMap;
 +
 +    /*
 +     * Messages to be sent, stored per partition
 +     */
 +    private final Hashtable<Integer, SendQueue> sendQueues;
 +
 +    /*
 +     * Thread pool to actually send messages
 +     */
 +    private final ExecutorService sendService;
 +
      @Inject
-     public TCPEmitter(Topology topology, @Named("tcp.partition.queue_size") int bufferSize,
+     SerializerDeserializer serDeser;
+ 
 -    static class MessageQueuesPerPartition {
 -        private Hashtable<Integer, Queue<byte[]>> queues = new Hashtable<Integer, Queue<byte[]>>();
 -        private boolean bounded;
++    @Inject
++    public TCPEmitter(Cluster topology, @Named("tcp.partition.queue_size") int bufferSize,
 +            @Named("comm.retries") int retries, @Named("comm.retry_delay") int retryDelay,
 +            @Named("comm.timeout") int timeout) throws InterruptedException {
 +        this.numRetries = retries;
 +        this.retryDelayMs = retryDelay;
 +        this.nettyTimeout = timeout;
 +        this.bufferCapacity = bufferSize;
 +        this.topology = topology;
 +        this.topology.addListener(this);
  
 -        MessageQueuesPerPartition(boolean bounded) {
 -            this.bounded = bounded;
 +        // Initialize data structures
-         int clusterSize = this.topology.getTopology().getNodes().size();
++        int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
 +        partitionChannelMap = HashBiMap.create(clusterSize);
 +        partitionNodeMap = HashBiMap.create(clusterSize);
 +        sendQueues = new Hashtable<Integer, SendQueue>(clusterSize);
 +
 +        // Initialize sendService
 +        int numCores = Runtime.getRuntime().availableProcessors();
 +        sendService = Executors.newFixedThreadPool(2 * numCores,
 +                new ThreadFactoryBuilder().setNameFormat("TCPEmitterSendServiceThread-#" + instanceId++ + "-%d")
 +                        .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
 +                            @Override
 +                            public void uncaughtException(Thread paramThread, Throwable paramThrowable) {
 +                                logger.error("Cannot send message", paramThrowable);
 +                            }
 +                        }).build());
 +
 +        // Initialize netty related structures
 +        ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
 +                Executors.newCachedThreadPool());
 +        bootstrap = new ClientBootstrap(factory);
 +        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
 +            @Override
 +            public ChannelPipeline getPipeline() {
 +                ChannelPipeline p = Channels.pipeline();
 +                p.addLast("1", new LengthFieldPrepender(4));
 +                p.addLast("2", new NotifyChannelInterestChange());
 +                return p;
 +            }
 +        });
 +
 +        bootstrap.setOption("tcpNoDelay", true);
 +        bootstrap.setOption("keepAlive", true);
 +        bootstrap.setOption("reuseAddress", true);
 +        bootstrap.setOption("connectTimeoutMillis", this.nettyTimeout);
 +    }
 +
 +    private class Message implements ChannelFutureListener {
 +        private final SendQueue sendQ;
 +        private final byte[] message;
 +        private boolean sendSuccess = false;
++        @Inject
++        SerializerDeserializer serDeser;
 +
 +        Message(SendQueue sendQ, byte[] message) {
 +            this.sendQ = sendQ;
 +            this.message = message;
          }
  
 -        private boolean add(int partitionId, byte[] message) {
 -            Queue<byte[]> messages = queues.get(partitionId);
 +        private void sendMessage() {
 +            sendQ.emitter.sendMessage(sendQ.partitionId, this);
 +        }
  
 -            if (messages == null) {
 -                messages = new ArrayDeque<byte[]>();
 -                queues.put(partitionId, messages);
 +        private void messageSendFailure() {
 +            logger.debug("Message send to partition {} has failed", sendQ.partitionId);
 +            synchronized (sendQ.failureFound) {
 +                sendQ.failureFound = true;
              }
 +            removeChannel(sendQ.partitionId);
 +            sendQ.spawnSendTask();
 +        }
  
 -            if (bounded && messages.size() >= BUFFER_SIZE) {
 -                // Too many messages already queued
 -                return false;
 +        @Override
 +        public void operationComplete(ChannelFuture future) throws Exception {
 +            if (future.isSuccess()) {
 +                sendSuccess = true;
 +                sendQ.clearWire();
 +                return;
              }
  
 -            messages.offer(message);
 -            return true;
 +            if (future.isCancelled()) {
 +                logger.error("Send I/O cancelled to " + future.getChannel().getRemoteAddress());
 +            }
 +
 +            // failed operation
 +            messageSendFailure();
          }
 +    }
  
 -        private byte[] peek(int partitionId) {
 -            Queue<byte[]> messages = queues.get(partitionId);
 +    private class SendQueue {
 +        private final TCPEmitter emitter;
 +        private final int partitionId;
 +        private final int bufferCapacity;
 +        private final Queue<Message> pending; // messages to be sent
 +        private final Queue<Message> wire; // messages in transit
 +
 +        private Integer bufferSize = 0;
 +        private Boolean sending = false;
 +        private Boolean failureFound = false;
 +        private Boolean newMessages = false;
 +
 +        SendQueue(TCPEmitter emitter, int partitionId, int bufferCapacity) {
 +            this.emitter = emitter;
 +            this.partitionId = partitionId;
 +            this.bufferCapacity = bufferCapacity;
 +            this.pending = new ConcurrentLinkedQueue<Message>();
 +            this.wire = new ConcurrentLinkedQueue<Message>();
 +        }
  
 -            try {
 -                return messages.peek();
 -            } catch (NullPointerException npe) {
 -                return null;
 +        private boolean lock() {
 +            if (sending)
 +                return false;
  
 -            }
 +            sending = true;
 +            return true;
          }
  
 -        private void remove(int partitionId) {
 -            Queue<byte[]> messages = queues.get(partitionId);
 +        private void unlock() {
 +            sending = false;
 +        }
  
 -            if (messages.isEmpty()) {
 -                logger.error("Trying to remove messages from an empty queue for partition" + partitionId);
 -                return;
 +        private boolean offer(byte[] message) {
 +            Message m = new Message(this, message);
 +            synchronized (bufferSize) {
 +                if (bufferSize >= bufferCapacity) {
 +                    return false;
 +                }
 +                bufferSize++;
              }
  
 -            if (messages != null)
 -                messages.remove();
 +            pending.add(m);
 +            spawnSendTask();
 +            return true;
 +
          }
 -    }
  
 -    private HashBiMap<Integer, Channel> partitionChannelMap;
 -    private HashBiMap<Integer, ClusterNode> partitionNodeMap;
 -    private MessageQueuesPerPartition queuedMessages = new MessageQueuesPerPartition(true);
 +        public void clearWire() {
 +            while (!wire.isEmpty()) {
 +                Message msg = wire.peek();
 +                if (!msg.sendSuccess)
 +                    return;
 +                wire.remove();
 +                synchronized (bufferSize) {
 +                    bufferSize--;
 +                }
 +            }
 +        }
  
 -    @Inject
 -    public TCPEmitter(Cluster topology) throws InterruptedException {
 -        this.topology = topology;
 -        topology.addListener(this);
 -        int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
 +        private void spawnSendTask() {
 +            // Lock before spawning a new SendTask
 +            boolean acquired = lock();
 +            if (acquired) {
 +                try {
 +                    emitter.sendService.execute(new SendTask(this));
 +                } finally {
 +                    unlock();
 +                }
 +            } else {
 +                synchronized (newMessages) {
 +                    newMessages = true;
 +                }
 +            }
 +        }
  
 -        partitionChannelMap = HashBiMap.create(clusterSize);
 -        partitionNodeMap = HashBiMap.create(clusterSize);
 +        private void resendWiredMessages() {
 +            clearWire();
 +            for (Message msg : wire) {
 +                msg.sendMessage();
 +            }
 +        }
  
 -        ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
 -                Executors.newCachedThreadPool());
 +        private void sendPendingMessages() {
 +            Message msg = null;
 +            while ((msg = pending.poll()) != null) {
 +                msg.sendMessage();
 +                wire.add(msg);
 +            }
 +        }
  
 -        bootstrap = new ClientBootstrap(factory);
 +        private void sendMessages() {
 +            while (true) {
 +                boolean resend = false;
 +                synchronized (failureFound) {
 +                    if (failureFound) {
 +                        resend = true;
 +                        failureFound = false;
 +                    } else
 +                        break;
 +                }
  
 -        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
 -            @Override
 -            public ChannelPipeline getPipeline() {
 -                ChannelPipeline p = Channels.pipeline();
 -                p.addLast("Framing", new LengthFieldPrepender(4));
 -                p.addLast("ChannelStateMonitor", new ChannelStateMonitoringHandler());
 -                return p;
 +                if (resend)
 +                    resendWiredMessages();
              }
 -        });
  
 -        bootstrap.setOption("tcpNoDelay", true);
 -        bootstrap.setOption("keepAlive", true);
 +            while (true) {
 +                sendPendingMessages();
 +                synchronized (newMessages) {
 +                    if (newMessages) {
 +                        newMessages = false;
 +                        continue;
 +                    } else {
 +                        unlock();
 +                        break;
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    private class SendTask implements Runnable {
 +        private final SendQueue sendQ;
 +
 +        SendTask(SendQueue sendQ) {
 +            this.sendQ = sendQ;
 +        }
 +
 +        @Override
 +        public void run() {
 +            sendQ.sendMessages();
 +        }
      }
  
      private boolean connectTo(Integer partitionId) {
          ClusterNode clusterNode = partitionNodeMap.get(partitionId);
  
          if (clusterNode == null) {
 -            if (topology.getPhysicalCluster().getNodes().size() == 0) {
 -                logger.error("No node in cluster ");
 -                return false;
 -            }
 -            clusterNode = topology.getPhysicalCluster().getNodes().get(partitionId);
 -            partitionNodeMap.forcePut(partitionId, clusterNode);
 -        }
+ 
 -        if (clusterNode == null) {
              logger.error("No ClusterNode exists for partitionId " + partitionId);
 +            onChange();
              return false;
          }
  
@@@ -389,113 -188,127 +397,133 @@@
              }
          }
  
 -        // /*
 -        // * Try limiting the size of the send queue inside Netty
 -
 -        // FIXME this does not work: in case of a disconnection, the channel remains non writeable, and the lock is
 -        // never released, hence blocking.
 -        // should be fixed using: 1/ configurable timeouts (should drop pending messages after the timeout) 2/ make sure
 -        // the handler is correctly responding to disconnections/reconnections 3/ there should be a lock per channel,
 -        // no?
 -
 -        // NOTE: might be fixed in S4-7
 -
 -        // */
 -        // if (!channel.isWritable()) {
 -        // synchronized (sendLock) {
 -        // // check again now that we have the lock
 -        // while (!channel.isWritable()) {
 -        // try {
 -        // sendLock.wait();
 -        // } catch (InterruptedException ie) {
 -        // return false;
 -        // }
 -        // }
 -        // }
 -        // }
 -
 -        // /*
 -        // * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
 -        // * buffered messages, and (3) the Current Message
 -        // *
 -        // * Once the channel returns success delete from the messagesOnTheWire
 -        // */
 -        // byte[] messageBeingSent = null;
 -        // // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
 -        // // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
 -        // // }
 -        //
 -        // while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
 -        // writeMessageToChannel(channel, partitionId, messageBeingSent);
 -        // queuedMessages.remove(partitionId);
 -        // }
 -
 -        writeMessageToChannel(channel, partitionId, serDeser.serialize(message));
 -        return true;
 +        if (!messageSent) {
 +            m.messageSendFailure();
 +        }
+ 
      }
  
      @Override
-     public boolean send(int partitionId, byte[] message) {
 -    public void operationComplete(ChannelFuture f) {
 -        int partitionId = partitionChannelMap.inverse().get(f.getChannel());
 -        if (f.isSuccess()) {
 -            // messagesOnTheWire.remove(partitionId);
++    public boolean send(int partitionId, EventMessage message) {
 +        if (!sendQueues.containsKey(partitionId)) {
 +            SendQueue sendQueue = new SendQueue(this, partitionId, this.bufferCapacity);
 +            sendQueues.put(partitionId, sendQueue);
          }
  
-         return sendQueues.get(partitionId).offer(message);
 -        if (f.isCancelled()) {
 -            logger.error("Send I/O was cancelled! " + f.getChannel().getRemoteAddress());
 -        } else if (!f.isSuccess()) {
 -            logger.error("Exception on I/O operation", f.getCause());
 -            logger.error(String.format("I/O on partition %d failed!", partitionId));
 -            partitionChannelMap.remove(partitionId);
++        return sendQueues.get(partitionId).offer(serDeser.serialize(message));
 +    }
 +
 +    protected void removeChannel(int partition) {
 +        Channel c = partitionChannelMap.remove(partition);
 +        if (c == null)
 +            return;
 +
 +        c.close().addListener(new ChannelFutureListener() {
 +            @Override
 +            public void operationComplete(ChannelFuture future) throws Exception {
 +                if (future.isSuccess())
 +                    channels.remove(future.getChannel());
 +                else
 +                    logger.error("FAILED to close channel");
 +            }
 +        });
 +    }
 +
 +    public void close() {
 +        for (SendQueue sendQ : sendQueues.values()) {
 +            if (!sendQ.wire.isEmpty()) {
 +                logger.error("TCPEmitter could not deliver {} messages to partition {}", sendQ.wire.size(),
 +                        sendQ.partitionId);
 +                sendQ.wire.clear();
 +            }
 +
 +            if (!sendQ.pending.isEmpty()) {
 +                logger.error("TCPEmitter could not send {} messages to partition {}", sendQ.pending.size(),
 +                        sendQ.partitionId);
 +                sendQ.pending.clear();
 +            }
 +        }
 +
 +        try {
 +            channels.close().await();
 +            bootstrap.releaseExternalResources();
 +        } catch (InterruptedException ie) {
 +            logger.error("Interrupted while closing");
 +            ie.printStackTrace();
          }
      }
  
      @Override
      public void onChange() {
-         for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
 -        /*
 -         * Close the channels that correspond to changed partitions and update partitionNodeMap
 -         */
+         for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
              Integer partition = clusterNode.getPartition();
 -            ClusterNode oldNode = partitionNodeMap.get(partition);
 +            if (partition == null) {
 +                logger.error("onChange(): Illegal partition for clusterNode - " + clusterNode);
 +                return;
 +            }
  
 +            ClusterNode oldNode = partitionNodeMap.remove(partition);
              if (oldNode != null && !oldNode.equals(clusterNode)) {
 -                if (partitionChannelMap.containsKey(partition)) {
 -                    partitionChannelMap.remove(partition).close();
 -                }
 +                removeChannel(partition);
              }
 -
              partitionNodeMap.forcePut(partition, clusterNode);
          }
      }
  
      @Override
      public int getPartitionCount() {
-         return topology.getTopology().getPartitionCount();
 -        // Number of nodes is not same as number of partitions
+         return topology.getPhysicalCluster().getPartitionCount();
      }
  
 -    class ChannelStateMonitoringHandler extends SimpleChannelHandler {
 +    class NotifyChannelInterestChange extends SimpleChannelHandler {
          @Override
          public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
 -            // logger.info(String.format("%08x %08x %08x", e.getValue(),
 -            // e.getChannel().getInterestOps(), Channel.OP_WRITE));
 -            // FIXME see above comments about fixing the buffering of messages
 -            // synchronized (sendLock) {
 -            // if (e.getChannel().isWritable()) {
 -            // sendLock.notify();
 -            // }
 -            // }
 +            Channel c = e.getChannel();
 +            Integer partitionId = partitionChannelMap.inverse().get(c);
 +            if (partitionId == null) {
 +                logger.debug("channelInterestChanged for an unknown/deleted channel");
 +                return;
 +            }
 +
 +            SendQueue sendQ = sendQueues.get(partitionId);
 +            synchronized (sendQ) {
 +                if (c.isWritable()) {
 +                    sendQ.notify();
 +                }
 +            }
++
              ctx.sendUpstream(e);
          }
  
          @Override
          public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
 -            Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
 -            String target;
 -            if (partitionId == null) {
 -                target = "unknown channel";
 -            } else {
 -                target = "channel for partition [" + partitionId + "], target node host ["
 -                        + partitionNodeMap.get(partitionId).getMachineName() + "], target node port ["
 -                        + partitionNodeMap.get(partitionId).getPort() + "]";
 -            }
 -            logger.error(
 -                    "Error on [{}]. This can be due to a disconnection of the receiver node. Channel will be closed.",
 -                    target);
 +            try {
 +                throw event.getCause();
 +            } catch (ClosedChannelException cce) {
 +                return;
 +            } catch (ConnectException ce) {
 +                return;
 +            } catch (Throwable e) {
 +                e.printStackTrace();
++                // Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
++                // String target;
++                // if (partitionId == null) {
++                // target = "unknown channel";
++                // } else {
++                // target = "channel for partition [" + partitionId + "], target node host ["
++                // + partitionNodeMap.get(partitionId).getMachineName() + "], target node port ["
++                // + partitionNodeMap.get(partitionId).getPort() + "]";
++                // }
++                // logger.error(
++                // "Error on [{}]. This can be due to a disconnection of the receiver node. Channel will be closed.",
++                // target);
++                //
++                // if (context.getChannel().isOpen()) {
++                // logger.info("Closing channel [{}] due to exception [{}]", target, event.getCause().getMessage());
++                // context.getChannel().close();
++                // }
+ 
 -            if (context.getChannel().isOpen()) {
 -                logger.info("Closing channel [{}] due to exception [{}]", target, event.getCause().getMessage());
 -                context.getChannel().close();
              }
 -
          }
      }
  }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index af019c0,58348af..2aa14f0
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@@ -29,21 -25,20 +30,25 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import com.google.inject.Inject;
 +import com.google.inject.name.Named;
  
+ /**
+  * Receives messages through TCP for the assigned subcluster.
+  * 
+  */
  public class TCPListener implements Listener {
 +    private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
      private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
      private ClusterNode node;
 -    private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
 +    private ServerBootstrap bootstrap;
 +    private final ChannelGroup channels = new DefaultChannelGroup();
 +    private final int nettyTimeout;
  
      @Inject
 -    public TCPListener(Assignment assignment) {
 +    public TCPListener(Assignment assignment, @Named("comm.timeout") int timeout) {
          // wait for an assignment
          node = assignment.assignClusterNode();
 +        nettyTimeout = timeout;
  
          ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
                  Executors.newCachedThreadPool());
@@@ -112,16 -92,17 +117,23 @@@
  
          public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
              logger.error("Error", event.getCause());
 -            if (context.getChannel().isOpen()) {
 -                logger.error("Closing channel due to exception");
 -                context.getChannel().close();
 -            }
 +            Channel c = context.getChannel();
 +            c.close().addListener(new ChannelFutureListener() {
 +                @Override
 +                public void operationComplete(ChannelFuture future) throws Exception {
 +                    if (future.isSuccess())
 +                        channels.remove(future.getChannel());
 +                    else
 +                        logger.error("FAILED to close channel");
 +                }
 +            });
          }
+ 
+         @Override
+         public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+             // TODO Auto-generated method stub
+             super.channelClosed(ctx, e);
+         }
+ 
      }
  }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
index 0000000,7dfa2c8..3fd5bcf
mode 000000,100644..100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
@@@ -1,0 -1,24 +1,27 @@@
+ package org.apache.s4.comm.tcp;
+ 
+ import org.apache.s4.base.RemoteEmitter;
+ import org.apache.s4.comm.topology.Cluster;
+ 
+ import com.google.inject.Inject;
+ import com.google.inject.assistedinject.Assisted;
++import com.google.inject.name.Named;
+ 
+ /**
+  * Emitter to remote subclusters.
+  * 
+  */
+ public class TCPRemoteEmitter extends TCPEmitter implements RemoteEmitter {
+ 
+     /**
+      * Sends to remote subclusters. This is dynamically created, through an injected factory, when new subclusters are
+      * discovered (as remote streams outputs)
+      */
+     @Inject
 -    public TCPRemoteEmitter(@Assisted Cluster topology) throws InterruptedException {
 -        super(topology);
++    public TCPRemoteEmitter(@Assisted Cluster topology, @Named("tcp.partition.queue_size") int bufferSize,
++            @Named("comm.retries") int retries, @Named("comm.retry_delay") int retryDelay,
++            @Named("comm.timeout") int timeout) throws InterruptedException {
++        super(topology, bufferSize, retries, retryDelay, timeout);
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
index 17f4037,6813f3f..a9066a4
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
@@@ -1,143 -1,13 +1,14 @@@
  package org.apache.s4.comm.topology;
  
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.List;
- import java.util.Set;
- 
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- 
- import com.google.common.collect.HashBiMap;
- import com.google.inject.Inject;
- import com.google.inject.Singleton;
- import com.google.inject.name.Named;
- 
  /**
-  * 
-  * The S4 physical cluster implementation.
+  * Represents a logical cluster
   * 
   */
- @Singleton
- public class Cluster {
- 
-     // TODO: do we need a Cluster interface to represent different types of
-     // implementations?
- 
-     private static final Logger logger = LoggerFactory.getLogger(Cluster.class);
- 
-     // List<ClusterNode> nodes = new ArrayList<ClusterNode>();
-     HashBiMap<Integer, ClusterNode> nodes = HashBiMap.create();
-     String mode = "unicast";
-     String name = "unknown";
- 
-     final private String[] hosts;
-     final private String[] ports;
-     final private int numNodes;
-     private int numPartitions;
- 
-     public Cluster(int numPartitions) {
-         this.hosts = new String[] {};
-         this.ports = new String[] {};
-         this.numNodes = 0;
-         this.numPartitions = numPartitions;
-     }
- 
-     /**
-      * Define the hosts and corresponding ports in the cluster.
-      * 
-      * @param hosts
-      *            a comma separates list of host names.
-      * @param ports
-      *            a comma separated list of ports.
-      * @throws IOException
-      *             if number of hosts and ports don't match.
-      */
-     @Inject
-     Cluster(@Named("cluster.hosts") String hosts, @Named("cluster.ports") String ports) throws IOException {
-         if (hosts != null && hosts.length() > 0 && ports != null && ports.length() > 0) {
-             this.ports = ports.split(",");
-             this.hosts = hosts.split(",");
- 
-             if (this.ports.length != this.hosts.length) {
-                 logger.error("Number of hosts should match number of ports in properties file. hosts: " + hosts
-                         + " ports: " + ports);
-                 throw new IOException();
-             }
- 
-             numNodes = this.hosts.length;
-             for (int i = 0; i < numNodes; i++) {
-                 ClusterNode node = new ClusterNode(i, Integer.parseInt(this.ports[i]), this.hosts[i], "");
-                 addNode(node);
-                 logger.info("Added cluster node: " + this.hosts[i] + ":" + this.ports[i]);
-             }
-             numPartitions = numNodes;
-         } else {
-             this.hosts = new String[] {};
-             this.ports = new String[] {};
-             this.numNodes = 0;
- 
-         }
-     }
- 
-     /**
-      * Number of partitions in the cluster.
-      * 
-      * @return
-      */
-     public int getPartitionCount() {
-         return numPartitions;
-     }
- 
-     /**
-      * @param node
-      */
-     public void addNode(ClusterNode node) {
-         // nodes.add(node);
-         if (nodes.containsKey(node.getPartition())) {
-             logger.debug("Partition {} already has a corresponding ClusterNode", node.getPartition());
-         }
-         if (nodes.containsValue(node)) {
-             logger.error("ClusterNode is already mapped to partition {}", nodes.inverse().get(node));
-         }
-         nodes.forcePut(node.getPartition(), node);
-     }
- 
-     /**
-      * @return a list of {@link ClusterNode} objects available in the cluster.
-      */
-     public Set<ClusterNode> getNodes() {
-         return Collections.unmodifiableSet(nodes.values());
-     }
- 
-     // TODO: do we need mode and name? Making provate for now.
- 
-     @SuppressWarnings("unused")
-     private String getMode() {
-         return mode;
-     }
- 
-     @SuppressWarnings("unused")
-     private void setMode(String mode) {
-         this.mode = mode;
-     }
- 
-     @SuppressWarnings("unused")
-     private String getName() {
-         return name;
-     }
 +
-     @SuppressWarnings("unused")
-     private void setName(String name) {
-         this.name = name;
-     }
+ public interface Cluster {
+     public PhysicalCluster getPhysicalCluster();
  
-     public String toString() {
-         StringBuffer sb = new StringBuffer();
-         sb.append("{name=").append(name).append(",mode=").append(mode).append(",type=").append(",nodes=").append(nodes)
-                 .append("}");
-         return sb.toString();
-     }
+     public void addListener(ClusterChangeListener listener);
  
+     public void removeListener(ClusterChangeListener listener);
  }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index 3d5a199,8fab1a6..dc700ea
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@@ -9,10 -9,11 +9,12 @@@ import java.util.HashMap
  import java.util.Map;
  
  import org.apache.s4.base.Emitter;
+ import org.apache.s4.base.EventMessage;
+ import org.apache.s4.base.SerializerDeserializer;
 -import org.apache.s4.comm.topology.ClusterNode;
+ import org.apache.s4.comm.topology.Cluster;
+ import org.apache.s4.comm.topology.ClusterChangeListener;
 +import org.apache.s4.comm.topology.ClusterNode;
- import org.apache.s4.comm.topology.Topology;
- import org.apache.s4.comm.topology.TopologyChangeListener;
 +import org.slf4j.LoggerFactory;
  
  import com.google.common.collect.HashBiMap;
  import com.google.inject.Inject;
@@@ -47,14 -49,12 +50,15 @@@ public class UDPEmitter implements Emit
      }
  
      @Override
-     public boolean send(int partitionId, byte[] message) {
+     public boolean send(int partitionId, EventMessage eventMessage) {
          try {
+             byte[] message = serDeser.serialize(eventMessage);
              ClusterNode node = nodes.get(partitionId);
              if (node == null) {
 -                throw new RuntimeException(String.format("Bad partition id %d", partitionId));
 +                LoggerFactory.getLogger(getClass()).error(
 +                        "Cannot send message to partition {} because this partition is not visible to this emitter",
 +                        partitionId);
 +                return false;
              }
              byte[] byteBuffer = new byte[message.length];
              System.arraycopy(message, 0, byteBuffer, 0, message.length);
@@@ -81,9 -81,9 +85,9 @@@
      public void onChange() {
          // topology changes when processes pick tasks
          synchronized (nodes) {
-             for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
+             for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
                  Integer partition = clusterNode.getPartition();
 -                nodes.put(partition, clusterNode);
 +                nodes.forcePut(partition, clusterNode);
              }
          }
      }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/resources/default.s4.comm.properties
index 0000000,07497b2..0c5bbfd
mode 000000,100644..100644
--- a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
+++ b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
@@@ -1,0 -1,8 +1,12 @@@
+ comm.queue_emmiter_size = 8000
+ comm.queue_listener_size = 8000
+ comm.emitter.class=org.apache.s4.comm.tcp.TCPEmitter
+ comm.emitter.remote.class=org.apache.s4.comm.tcp.TCPRemoteEmitter
+ comm.listener.class=org.apache.s4.comm.tcp.TCPListener
++comm.retries=10
++comm.retry_delay=10
++comm.timeout=1000
++tcp.partition.queue_size=256
+ cluster.zk_address = localhost:2181
+ cluster.zk_session_timeout = 10000
+ cluster.zk_connection_timeout = 10000

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
index 8e94c44,0000000..c0e811e
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
@@@ -1,10 -1,0 +1,16 @@@
 +package org.apache.s4.comm.tcp;
 +
++import java.io.IOException;
++
++import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- public class MultiPartitionDeliveryTest extends TCPBasedTest {
-     public MultiPartitionDeliveryTest() {
++public class MultiPartitionDeliveryTest extends TCPCommTest {
++
++    private static Logger logger = LoggerFactory.getLogger(MultiPartitionDeliveryTest.class);
++
++    public MultiPartitionDeliveryTest() throws IOException {
 +        super(6);
 +        logger = LoggerFactory.getLogger(MultiPartitionDeliveryTest.class);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
index db9b596,0000000..50ae056
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
@@@ -1,33 -1,0 +1,38 @@@
 +package org.apache.s4.comm.tcp;
 +
++import java.io.IOException;
++
 +import org.apache.s4.comm.util.PartitionInfo;
 +import org.junit.Assert;
++import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- public class NetworkGlitchTest extends TCPBasedTest {
-     public NetworkGlitchTest() {
++public class NetworkGlitchTest extends TCPCommTest {
++
++    private static Logger logger = LoggerFactory.getLogger(NetworkGlitchTest.class);
++
++    public NetworkGlitchTest() throws IOException {
 +        super(2);
 +        logger = LoggerFactory.getLogger(NetworkGlitchTest.class);
 +    }
 +
-     @Override
 +    public void testDelivery() throws InterruptedException {
 +        PartitionInfo util = partitions[0];
 +
 +        startThreads();
 +
 +        for (int i = 0; i < 4; i++) {
 +            Thread.sleep(500);
 +            logger.debug("Messages sent so far - {}", util.sendThread.sendCounts);
 +            ((TCPEmitter) util.emitter).removeChannel(0);
 +            logger.debug("Channel closed");
 +        }
 +
 +        waitForThreads();
 +
 +        Assert.assertTrue("Message delivery", messageDelivery());
 +
 +        logger.info("Message ordering - " + messageOrdering());
 +        Assert.assertTrue("Pairwise message ordering", messageOrdering());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
index 64e5c73,0000000..f5d1fc0
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
@@@ -1,10 -1,0 +1,16 @@@
 +package org.apache.s4.comm.tcp;
 +
++import java.io.IOException;
++
++import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- public class SimpleDeliveryTest extends TCPBasedTest {
-     public SimpleDeliveryTest() {
++public class SimpleDeliveryTest extends TCPCommTest {
++
++    private static Logger logger = LoggerFactory.getLogger(SimpleDeliveryTest.class);
++
++    public SimpleDeliveryTest() throws IOException {
 +        super();
 +        logger = LoggerFactory.getLogger(SimpleDeliveryTest.class);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
index 0000000,268715b..a65fca4
mode 000000,100644..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
@@@ -1,0 -1,72 +1,64 @@@
+ package org.apache.s4.comm.tcp;
+ 
+ import java.io.IOException;
+ 
+ import org.apache.s4.comm.DefaultCommModule;
+ import org.apache.s4.comm.DeliveryTestUtil;
 -import org.apache.s4.fixtures.ZkBasedTest;
 -import org.apache.zookeeper.KeeperException;
++import org.apache.s4.comm.util.ProtocolTestUtil;
+ import org.junit.Assert;
 -import org.junit.Before;
 -import org.junit.Test;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
+ 
+ import com.google.common.io.Resources;
+ import com.google.inject.AbstractModule;
+ import com.google.inject.Guice;
+ import com.google.inject.Injector;
+ import com.google.inject.name.Names;
+ 
 -public class TCPCommTest extends ZkBasedTest {
++public abstract class TCPCommTest extends ProtocolTestUtil {
++
++    private static Logger logger = LoggerFactory.getLogger(TCPCommTest.class);
+     DeliveryTestUtil util;
+     public final static String CLUSTER_NAME = "cluster1";
++    Injector injector;
++
++    public TCPCommTest() throws IOException {
++        super();
++        injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
++                .openStream(), CLUSTER_NAME), new TCPCommTestModule());
++    }
++
++    public TCPCommTest(int numTasks) throws IOException {
++        super(numTasks);
++        injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
++                .openStream(), CLUSTER_NAME), new TCPCommTestModule());
++    }
+ 
 -    @Before
 -    public void setup() throws IOException, InterruptedException, KeeperException {
 -        Injector injector = Guice.createInjector(
 -                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), CLUSTER_NAME),
 -                new TCPCommTestModule());
 -        util = injector.getInstance(DeliveryTestUtil.class);
++    public Injector getInjector() {
++        return injector;
+     }
+ 
+     class TCPCommTestModule extends AbstractModule {
+         TCPCommTestModule() {
+ 
+         }
+ 
+         @Override
+         protected void configure() {
+             bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
+             bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
+             bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
+         }
+     }
+ 
 -    /**
 -     * Tests the protocol. If all components function without throwing exceptions, the test passes. The test also
 -     * reports the loss of messages, if any.
 -     * 
 -     * @throws InterruptedException
 -     */
 -    @Test
 -    public void testTCPDelivery() throws InterruptedException {
 -        try {
 -            Thread sendThread = util.newSendThread();
 -            Thread receiveThread = util.newReceiveThread();
 -
 -            // start send and receive threads
 -            sendThread.start();
 -            receiveThread.start();
++    @Override
++    public void testDelivery() throws InterruptedException {
++        startThreads();
++        waitForThreads();
+ 
 -            // wait for them to finish
 -            sendThread.join();
 -            receiveThread.join();
 -
 -            Assert.assertTrue("Guaranteed message delivery", !util.moreMessages(receiveThread));
 -        } catch (Exception e) {
 -            e.printStackTrace();
 -            Assert.fail("TCP has failed basic functionality test");
 -        }
++        Assert.assertTrue("Message Delivery", messageDelivery());
+ 
 -        System.out.println("Done");
++        logger.info("Message ordering - " + messageOrdering());
++        Assert.assertTrue("Pairwise message ordering", messageOrdering());
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
index 0559b41,0000000..ad030c0
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
@@@ -1,7 -1,0 +1,9 @@@
 +package org.apache.s4.comm.udp;
 +
- public class MultiPartitionDeliveryTest extends UDPBasedTest {
-     public MultiPartitionDeliveryTest() {
++import java.io.IOException;
++
++public class MultiPartitionDeliveryTest extends UDPCommTest {
++    public MultiPartitionDeliveryTest() throws IOException {
 +        super(2);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
index c29f0ca,0000000..41d3017
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
@@@ -1,7 -1,0 +1,9 @@@
 +package org.apache.s4.comm.udp;
 +
- public class SimpleDeliveryTest extends UDPBasedTest {
-     public SimpleDeliveryTest() {
++import java.io.IOException;
++
++public class SimpleDeliveryTest extends UDPCommTest {
++    public SimpleDeliveryTest() throws IOException {
 +        super();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
index 0000000,8f01a9b..90ebc3f
mode 000000,100644..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
@@@ -1,0 -1,67 +1,65 @@@
+ package org.apache.s4.comm.udp;
+ 
+ import java.io.IOException;
+ 
+ import org.apache.s4.comm.DefaultCommModule;
+ import org.apache.s4.comm.DeliveryTestUtil;
 -import org.apache.s4.comm.tcp.TCPCommTest;
 -import org.apache.s4.fixtures.ZkBasedTest;
 -import org.apache.zookeeper.KeeperException;
++import org.apache.s4.comm.util.ProtocolTestUtil;
+ import org.junit.Assert;
 -import org.junit.Before;
 -import org.junit.Test;
+ 
+ import com.google.common.io.Resources;
+ import com.google.inject.AbstractModule;
+ import com.google.inject.Guice;
+ import com.google.inject.Injector;
+ import com.google.inject.name.Names;
+ 
 -public class UDPCommTest extends ZkBasedTest {
++public abstract class UDPCommTest extends ProtocolTestUtil {
+     DeliveryTestUtil util;
++    private Injector injector;
+ 
 -    @Before
 -    public void setup() throws IOException, InterruptedException, KeeperException {
 -        Injector injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
 -                .openStream(), TCPCommTest.CLUSTER_NAME), new UDPCommTestModule());
 -        util = injector.getInstance(DeliveryTestUtil.class);
++    public UDPCommTest() throws IOException {
++        super();
++        injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
++                .openStream(), "cluster1"), new UDPCommTestModule());
++    }
++
++    public UDPCommTest(int numTasks) throws IOException {
++        super(numTasks);
++        injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
++                .openStream(), "cluster1"), new UDPCommTestModule());
++    }
++
++    @Override
++    protected Injector getInjector() throws IOException {
++        return injector;
+     }
+ 
+     class UDPCommTestModule extends AbstractModule {
+         UDPCommTestModule() {
+         }
+ 
+         @Override
+         protected void configure() {
+             bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
+             bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
+             bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
+         }
+     }
+ 
+     /**
+      * Tests the protocol. If all components function without throwing exceptions, the test passes.
+      * 
+      * @throws InterruptedException
+      */
 -    @Test
 -    public void testUDPDelivery() throws InterruptedException {
++    @Override
++    public void testDelivery() {
+         try {
 -            Thread sendThread = util.newSendThread();
 -            Thread receiveThread = util.newReceiveThread();
 -
 -            // start send and receive threads
 -            sendThread.start();
 -            receiveThread.start();
 -
 -            // wait for them to finish
 -            sendThread.join();
 -            receiveThread.join();
 -
++            Thread.sleep(1000);
++            startThreads();
++            waitForThreads();
++            Assert.assertTrue("Message Delivery", messageDelivery());
+         } catch (Exception e) {
 -            e.printStackTrace();
 -            Assert.fail("UDP has failed basic functionality test");
++            Assert.fail("UDP DeliveryTest");
+         }
 -        System.out.println("Done");
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
index 91fc7ec,0000000..6078f7e
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
@@@ -1,176 -1,0 +1,183 @@@
 +package org.apache.s4.comm.util;
 +
 +import java.util.ArrayList;
 +import java.util.Hashtable;
 +import java.util.List;
 +
 +import org.apache.s4.base.Emitter;
++import org.apache.s4.base.EventMessage;
 +import org.apache.s4.base.Listener;
++import org.apache.s4.base.SerializerDeserializer;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.inject.Inject;
 +import com.google.inject.name.Named;
 +
 +/**
 + * Test util for communication protocols.
 + * 
 + * <ul>
 + * <li>The util defines Send and Receive Threads</li>
 + * <li>SendThread sends out a pre-defined number of messages to all the partitions</li>
 + * <li>ReceiveThread receives all/most of these messages</li>
 + * <li>To avoid the receiveThread waiting for ever, it spawns a TimerThread that would interrupt after a pre-defined but
 + * long enough interval</li>
 + * </ul>
 + * 
 + */
 +public class PartitionInfo {
 +    private static final Logger logger = LoggerFactory.getLogger(PartitionInfo.class);
 +    public Emitter emitter;
 +    public Listener listener;
 +    public SendThread sendThread;
 +    public ReceiveThread receiveThread;
 +
 +    private final int numRetries;
 +    private final int retryDelayMs;
 +    private int numMessages;
 +    private int partitionId;
 +    private ProtocolTestUtil ptu;
 +
 +    @Inject
++    SerializerDeserializer serDeser;
++
++    @Inject
 +    public PartitionInfo(Emitter emitter, Listener listener, @Named("comm.retries") int retries,
 +            @Named("comm.retry_delay") int retryDelay, @Named("emitter.send.numMessages") int numMessages) {
 +        this.emitter = emitter;
 +        this.listener = listener;
 +        this.partitionId = this.listener.getPartitionId();
 +        logger.debug("# Partitions = {}; Current partition = {}", this.emitter.getPartitionCount(),
 +                this.listener.getPartitionId());
 +
 +        this.numRetries = retries;
 +        this.retryDelayMs = retryDelay;
 +        this.numMessages = numMessages;
 +        // this.messagesExpected = numMessages * this.emitter.getPartitionCount();
 +
 +        this.sendThread = new SendThread();
 +        this.receiveThread = new ReceiveThread();
 +    }
 +
 +    public void setProtocolTestUtil(ProtocolTestUtil ptu) {
 +        this.ptu = ptu;
 +        this.ptu.expectedMessages[partitionId] = numMessages * this.emitter.getPartitionCount();
 +    }
 +
 +    public class SendThread extends Thread {
 +        public int[] sendCounts = new int[emitter.getPartitionCount()];
 +
 +        public SendThread() {
 +            super("SendThread-" + partitionId);
 +        }
 +
 +        @Override
 +        public void run() {
 +            try {
 +                for (int i = 0; i < numMessages; i++) {
 +                    for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
-                         byte[] message = new String(partitionId + " " + i).getBytes();
++                        EventMessage message = new EventMessage("app1", "stream1",
++                                new String(partitionId + " " + i).getBytes());
 +                        for (int retries = 0; retries < numRetries; retries++) {
 +                            if (emitter.send(partition, message)) {
 +                                sendCounts[partition]++;
 +                                break;
 +                            }
 +                            Thread.sleep(retryDelayMs);
 +                        }
 +                    }
 +                }
 +            } catch (Exception e) {
 +                e.printStackTrace();
 +                return;
 +            }
 +
 +            for (int i = 0; i < sendCounts.length; i++) {
 +                if (sendCounts[i] < numMessages) {
 +                    ptu.decreaseExpectedMessages(i, (numMessages - sendCounts[i]));
 +                }
 +            }
 +
 +            logger.debug("Exiting");
 +        }
 +    }
 +
 +    public class ReceiveThread extends Thread {
 +        protected int messagesReceived = 0;
 +        private Hashtable<Integer, List<Integer>> receivedMessages;
 +
 +        ReceiveThread() {
 +            super("ReceiveThread-" + partitionId);
 +            receivedMessages = new Hashtable<Integer, List<Integer>>();
 +        }
 +
 +        @Override
 +        public void run() {
 +            while (messagesReceived < ptu.expectedMessages[partitionId]) {
 +                byte[] message = listener.recv();
 +                if (message == null) {
 +                    logger.error("ReceiveThread {}: received a null message", partitionId);
 +                    break;
 +                }
 +
++                EventMessage deserialized = (EventMessage) serDeser.deserialize(message);
 +                // process and store the message
-                 String msgString = new String(message);
++                String msgString = new String(deserialized.getSerializedEvent());
 +                String[] msgTokens = msgString.split(" ");
 +                Integer senderPartition = Integer.parseInt(msgTokens[0]);
 +                Integer receivedMsg = Integer.parseInt(msgTokens[1]);
 +
 +                if (!receivedMessages.containsKey(senderPartition)) {
 +                    receivedMessages.put(senderPartition, new ArrayList<Integer>());
 +                }
 +
 +                List<Integer> messagesList = receivedMessages.get(senderPartition);
 +
 +                if (messagesList.contains(receivedMsg)) {
 +                    messagesList.remove(receivedMsg);
 +                } else {
 +                    messagesReceived++;
 +                }
 +                messagesList.add(receivedMsg);
 +            }
 +
 +            logger.debug("Exiting");
 +        }
 +
 +        public boolean orderedDelivery() {
 +            for (List<Integer> messagesList : receivedMessages.values()) {
 +                int lastMsg = -1;
 +                for (Integer msg : messagesList) {
 +                    if (msg <= lastMsg) {
 +                        return false;
 +                    }
 +                }
 +            }
 +            return true;
 +        }
 +
 +        public boolean messageDelivery() {
 +            if (messagesReceived < ptu.expectedMessages[partitionId]) {
 +                printCounts();
 +                return false;
 +            } else
 +                return true;
 +        }
 +
 +        public void printCounts() {
 +            logger.debug("ReceiveThread {}: Messages not received = {}", partitionId,
 +                    (ptu.expectedMessages[partitionId] - messagesReceived));
 +            int counts[] = new int[emitter.getPartitionCount()];
 +            for (Integer sender : receivedMessages.keySet()) {
 +                counts[sender] = receivedMessages.get(sender).size();
 +            }
 +
 +            logger.debug("ReceiveThread {}: recvdCounts: {}", partitionId, counts);
 +        }
 +
 +        public int moreMessages() {
 +            return (int) (ptu.expectedMessages[partitionId] - messagesReceived);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
index bc4f166,0000000..421aacd
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
@@@ -1,97 -1,0 +1,98 @@@
 +package org.apache.s4.comm.util;
 +
 +import java.io.IOException;
 +
 +import org.apache.s4.fixtures.ZkBasedTest;
 +import org.apache.zookeeper.KeeperException;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +
 +import com.google.inject.Injector;
 +
 +public abstract class ProtocolTestUtil extends ZkBasedTest {
 +    protected int[] expectedMessages;
-     protected Injector injector;
 +    protected PartitionInfo[] partitions;
 +
 +    protected ProtocolTestUtil() {
 +        super();
 +    }
 +
 +    protected ProtocolTestUtil(int numTasks) {
 +        super(numTasks);
 +    }
 +
 +    @Before
-     public void setup() throws IOException, InterruptedException, KeeperException {
++    public void preparePartitions() throws IOException, InterruptedException, KeeperException {
 +        expectedMessages = new int[super.numTasks];
 +        partitions = new PartitionInfo[super.numTasks];
 +        for (int i = 0; i < this.numTasks; i++) {
-             partitions[i] = injector.getInstance(PartitionInfo.class);
++            partitions[i] = getInjector().getInstance(PartitionInfo.class);
 +            partitions[i].setProtocolTestUtil(this);
 +        }
 +    }
 +
++    protected abstract Injector getInjector() throws IOException;
++
 +    protected void decreaseExpectedMessages(int partition, long amount) {
 +        synchronized (expectedMessages) {
 +            expectedMessages[partition] -= amount;
 +        }
 +
 +        if (partitions[partition].receiveThread.messagesReceived >= expectedMessages[partition])
 +            interrupt(partition);
 +    }
 +
 +    protected void interrupt(int partition) {
 +        partitions[partition].receiveThread.interrupt();
 +    }
 +
 +    protected void startThreads() {
 +        for (PartitionInfo partition : partitions) {
 +            partition.sendThread.start();
 +            partition.receiveThread.start();
 +        }
 +    }
 +
 +    protected void waitForThreads() throws InterruptedException {
 +        for (PartitionInfo partition : partitions) {
 +            partition.sendThread.join();
 +            partition.receiveThread.join();
 +        }
 +    }
 +
 +    protected boolean messageDelivery() {
 +        for (PartitionInfo partition : partitions) {
 +            if (!partition.receiveThread.messageDelivery())
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    protected boolean messageOrdering() {
 +        for (PartitionInfo partition : partitions) {
 +            if (!partition.receiveThread.orderedDelivery())
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    @After
 +    public void tearDown() {
 +        for (PartitionInfo partition : partitions) {
 +            // debug
 +            partition.receiveThread.printCounts();
 +            if (partition.emitter != null) {
 +                partition.emitter.close();
 +                partition.emitter = null;
 +            }
 +            if (partition.listener != null) {
 +                partition.listener.close();
 +                partition.listener = null;
 +            }
 +        }
 +    }
 +
 +    @Test(timeout = 60000)
 +    public abstract void testDelivery() throws InterruptedException;
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
index 3b1e45a,2f336e0..fdd607b
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
@@@ -13,40 -12,17 +12,27 @@@ import org.slf4j.LoggerFactory
  
  public abstract class ZkBasedTest {
      private static final Logger logger = LoggerFactory.getLogger(ZkBasedTest.class);
-     private ZkServer zkServer;
-     protected String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
 +
-     private static final String clusterName = "s4-test-cluster";
+     private Factory zkFactory;
 +    protected final int numTasks;
 +
 +    protected ZkBasedTest() {
 +        this.numTasks = 1;
 +    }
 +
 +    protected ZkBasedTest(int numTasks) {
 +        this.numTasks = numTasks;
 +    }
  
      @Before
-     public void setupZkBasedTest() {
-         String dataDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
-         String logDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
+     public void prepare() throws IOException, InterruptedException, KeeperException {
          CommTestUtils.cleanupTmpDirs();
  
-         IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
-             @Override
-             public void createDefaultNameSpace(ZkClient zkClient) {
+         zkFactory = CommTestUtils.startZookeeperServer();
  
-             }
-         };
- 
-         logger.info("Starting Zookeeper Server");
-         zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, CommTestUtils.ZK_PORT);
-         zkServer.start();
- 
-         TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
-         taskSetup.clean(clusterName);
-         taskSetup.setup(clusterName, this.numTasks);
+         TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+         taskSetup.clean("s4");
 -        taskSetup.setup("cluster1", 1, 1300);
++        taskSetup.setup("cluster1", numTasks, 1300);
      }
  
      @After

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
index 0000000,615f114..a5367c7
mode 000000,100644..100644
--- a/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
+++ b/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
@@@ -1,0 -1,9 +1,12 @@@
+ comm.queue_emmiter_size = 8000
+ comm.queue_listener_size = 8000
+ comm.emitter.class = org.apache.s4.comm.udp.UDPEmitter
+ comm.emitter.remote.class = org.apache.s4.comm.udp.UDPRemoteEmitter
+ comm.listener.class = org.apache.s4.comm.udp.UDPListener
+ cluster.name = cluster1
+ cluster.zk_address = localhost:2181
+ cluster.zk_session_timeout = 10000
 -cluster.zk_connection_timeout = 10000
++cluster.zk_connection_timeout = 10000
++comm.retries=10
++comm.retry_delay=10
++comm.timeout=1000

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --cc subprojects/s4-core/s4-core.gradle
index 3aa1d03,46d580d..fffab81
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@@ -19,22 -19,15 +19,15 @@@ description = 'The S4 core platform.
  dependencies {
      compile project(":s4-base")
      compile project(":s4-comm")
-     testCompile project(path: ':s4-comm', configuration: 'tests')
- }
- 
- task testJar(type: Jar) {
-     baseName = "test-${project.archivesBaseName}"
-     from sourceSets.test.classes
- }
- 
- configurations {
-     tests
- }
- 
- artifacts {
-     tests testJar
+     compile project(path: ':s4-comm', configuration: 'tests')
+     compile libraries.jcommander
+     testCompile project(path: ':s4-comm', configuration: 'tests')
+     testCompile libraries.gradle_base_services
+     testCompile libraries.gradle_core
+     testCompile libraries.gradle_tooling_api
 -    testCompile libraries.gradle_wrapper
++    testCompile libraries.gradle_wrapper
  }
  
  test {
-     forkEvery=1
- }
+     forkEvery=1;
 -}
++}