You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/11/14 17:15:25 UTC

[1/3] git commit: Performance improvements and configurability - clearly identify asynchronous stages and use configurable and injectable executors for each of them (deserialization, processing, serialization) - default executors for processing/sending u

Updated Branches:
  refs/heads/S4-95 3c450d4a3 -> f9689ea00


Performance improvements and configurability
- clearly identify asynchronous stages and use configurable and injectable executors
for each of them (deserialization, processing, serialization)
- default executors for processing/sending use throttling
- stream processing now parallelizable, per stream
- Sender and Receiver are now interfaces, part of the base API
- SenderImpl and ReceiverImpl are default implementations in s4-core
- simplified regression tests for communication protocols


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/f9689ea0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/f9689ea0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/f9689ea0

Branch: refs/heads/S4-95
Commit: f9689ea0055e1d7a2a8459b8ee380283767f0ac8
Parents: f04bd09
Author: Matthieu Morel <mm...@apache.org>
Authored: Mon Nov 12 11:28:25 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Wed Nov 14 17:48:22 2012 +0100

----------------------------------------------------------------------
 build.gradle                                       |   10 +-
 lib/reflectasm-1.07-shaded.jar                     |  Bin 0 -> 65612 bytes
 .../src/main/java/org/apache/s4/base/Listener.java |   15 +-
 .../src/main/java/org/apache/s4/base/Receiver.java |   24 ++
 .../src/main/java/org/apache/s4/base/Sender.java   |   32 +++
 .../main/java/org/apache/s4/base/package-info.java |    3 +
 subprojects/s4-benchmarks/README.md                |   29 ++-
 subprojects/s4-benchmarks/bench-cluster.sh         |   45 +++-
 subprojects/s4-benchmarks/config/injector.config   |    2 +-
 subprojects/s4-benchmarks/config/node.config       |    1 -
 .../apache/s4/benchmark/simpleApp1/Injector.java   |   78 +------
 .../apache/s4/benchmark/simpleApp1/SimpleApp.java  |   23 +-
 .../apache/s4/benchmark/simpleApp1/SimplePE1.java  |   70 +-----
 .../apache/s4/benchmark/simpleApp1/SimplePE2.java  |   30 +++
 subprojects/s4-benchmarks/startInjector.sh         |   19 ++
 subprojects/s4-benchmarks/startNode.sh             |   14 +-
 .../java/org/apache/s4/comm/DefaultCommModule.java |    1 +
 .../comm/DefaultDeserializerExecutorFactory.java   |   34 +++
 .../s4/comm/DeserializerExecutorFactory.java       |   13 +
 .../comm/ThrottlingThreadPoolExecutorService.java  |  185 +++++++++++++
 .../org/apache/s4/comm/serialize/KryoSerDeser.java |    3 +-
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |    1 -
 .../java/org/apache/s4/comm/tcp/TCPListener.java   |   43 +--
 .../java/org/apache/s4/comm/udp/UDPListener.java   |   20 +-
 .../src/main/resources/default.s4.comm.properties  |    9 +-
 .../java/org/apache/s4/comm/DeliveryTestUtil.java  |  162 ------------
 .../s4/comm/tcp/MultiPartitionDeliveryTest.java    |   34 ---
 .../org/apache/s4/comm/tcp/SimpleDeliveryTest.java |   34 ---
 .../java/org/apache/s4/comm/tcp/TCPBasicTest.java  |   49 ++++
 .../java/org/apache/s4/comm/tcp/TCPCommTest.java   |   68 -----
 .../s4/comm/topology/AssignmentsFromZKTest1.java   |    3 +-
 .../s4/comm/topology/ClustersFromZKTest.java       |    3 +-
 .../org/apache/s4/comm/topology/ZKBaseTest.java    |   44 ---
 .../s4/comm/udp/MultiPartitionDeliveryTest.java    |   27 --
 .../org/apache/s4/comm/udp/SimpleDeliveryTest.java |   27 --
 .../java/org/apache/s4/comm/udp/UDPBasicTest.java  |   48 ++++
 .../java/org/apache/s4/comm/udp/UDPCommTest.java   |   79 ------
 .../org/apache/s4/comm/util/PartitionInfo.java     |  194 --------------
 .../org/apache/s4/comm/util/ProtocolTestUtil.java  |  116 --------
 .../java/org/apache/s4/fixtures/CommTestUtils.java |    3 +
 .../java/org/apache/s4/fixtures/MockReceiver.java  |   38 +++
 .../org/apache/s4/fixtures/MockReceiverModule.java |   27 ++
 .../java/org/apache/s4/fixtures/NoOpReceiver.java  |   22 ++
 .../org/apache/s4/fixtures/NoOpReceiverModule.java |   27 ++
 .../src/test/resources/udp.s4.comm.properties      |    7 +
 subprojects/s4-core/s4-core.gradle                 |    1 +
 .../src/main/java/org/apache/s4/core/App.java      |   13 +-
 .../java/org/apache/s4/core/DefaultCoreModule.java |   17 ++
 .../java/org/apache/s4/core/ProcessingElement.java |    5 +-
 .../src/main/java/org/apache/s4/core/Receiver.java |  122 ---------
 .../main/java/org/apache/s4/core/ReceiverImpl.java |  114 ++++++++
 .../java/org/apache/s4/core/RemoteSenders.java     |   38 +++-
 .../src/main/java/org/apache/s4/core/Sender.java   |  126 ---------
 .../main/java/org/apache/s4/core/SenderImpl.java   |  166 ++++++++++++
 .../src/main/java/org/apache/s4/core/Stream.java   |  204 ++++++++-------
 .../ft/FileSystemBackendCheckpointingModule.java   |    3 +-
 ...DefaultRemoteSendersExecutorServiceFactory.java |   20 ++
 .../DefaultSenderExecutorServiceFactory.java       |   34 +++
 ...aultStreamProcessingExecutorServiceFactory.java |   37 +++
 .../RemoteSendersExecutorServiceFactory.java       |   12 +
 .../core/staging/SenderExecutorServiceFactory.java |   13 +
 .../core/staging/StreamExecutorServiceFactory.java |   29 ++
 .../java/org/apache/s4/core/util/S4Metrics.java    |   22 +-
 .../org/apache/s4/core/ft/CheckpointingTest.java   |    1 +
 .../org/apache/s4/core/windowing/WindowingPE1.java |    1 -
 .../org/apache/s4/fixtures/MockCoreModule.java     |   23 ++-
 .../java/org/apache/s4/example/counter/MyApp.java  |    9 +-
 67 files changed, 1342 insertions(+), 1384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 9755259..89113f2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -58,10 +58,11 @@ project.ext["libraries"] = [
     guice:              'com.google.inject:guice:3.0',
     aop_alliance:       'aopalliance:aopalliance:1.0',
     guice_assist:       'com.google.inject.extensions:guice-assistedinject:3.0',
-    kryo:               'com.esotericsoftware.kryo:kryo:2.17',
+    kryo:               'com.esotericsoftware.kryo:kryo:2.20',
     minlog:             'com.googlecode:minlog:1.2',
-    reflectasm:         'com.esotericsoftware.reflectasm:reflectasm:1.07',
-    netty:              'org.jboss.netty:netty:3.2.5.Final',
+    // NOTE shaded jar is not resolved correctly, we include it in /lib directory
+    reflectasm:         'com.esotericsoftware.reflectasm:reflectasm:1.07-shaded',
+    netty:              'io.netty:netty:3.5.9.Final',
     mockito_core:       'org.mockito:mockito-core:1.9.0',
     commons_config:     'commons-configuration:commons-configuration:1.6',
     commons_codec:      'commons-codec:commons-codec:1.4',
@@ -145,9 +146,10 @@ subprojects {
         compile(libraries.reflectasm)
         runtime(libraries.objenesis)
         runtime(libraries.kryo)
+        runtime(libraries.reflectasm)
         runtime(libraries.netty)
         runtime(libraries.asm)
-        runtime(libraries.javax_inject)
+        compile(libraries.javax_inject)
         runtime(libraries.commons_codec)
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/lib/reflectasm-1.07-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/reflectasm-1.07-shaded.jar b/lib/reflectasm-1.07-shaded.jar
new file mode 100644
index 0000000..1e182c1
Binary files /dev/null and b/lib/reflectasm-1.07-shaded.jar differ

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
index 98a2c60..d869e33 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java
@@ -18,25 +18,18 @@
 
 package org.apache.s4.base;
 
-import java.nio.ByteBuffer;
-
 /**
  * 
- * Get a byte array received by a lower level layer.
+ * Defines the communication level entry point for serialized events. The implementation is expected to open a server
+ * socket on the node's listening port, receive messages through this channel, and delegate to the application layer
+ * through the {@link Receiver} interface.
  * 
  */
 public interface Listener {
 
     /**
-     * Perform blocking receive on the appropriate communication channel
-     * 
-     * @return <ul>
-     *         <li>byte[] message returned by the channel</li>
-     *         <li>null if the associated blocking thread is interrupted</li>
-     *         </ul>
+     * Returns the id of the partition currently assigned to this node.
      */
-    ByteBuffer recv();
-
     public int getPartitionId();
 
     void close();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-base/src/main/java/org/apache/s4/base/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Receiver.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Receiver.java
new file mode 100644
index 0000000..be90722
--- /dev/null
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Receiver.java
@@ -0,0 +1,24 @@
+package org.apache.s4.base;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Defines the entry point from the communication layer to the application layer.
+ * 
+ * Events received as raw bytes through the {@link Listener} implementation use the {@link Receiver#receive(ByteBuffer)}
+ * method so that events can be deserialized (conversion from byte[] to Event objects) and enqueued for processing.
+ * 
+ */
+public interface Receiver {
+
+    /**
+     * Handle a serialized message, i.e. deserialize the message and pass it to event processors.
+     */
+    void receive(ByteBuffer message);
+
+    /**
+     * Returns the partition id currently assigned to this node.
+     */
+    int getPartitionId();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java
new file mode 100644
index 0000000..c69cc45
--- /dev/null
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java
@@ -0,0 +1,32 @@
+package org.apache.s4.base;
+
+/**
+ * Defines the exit point from the event processing layer to the communication layer. The implementation is responsible
+ * for serializing events and passing serialized data to the communication layer.
+ * 
+ */
+public interface Sender {
+
+    /**
+     * This method attempts to send an event to a remote partition. If the destination is local, the method does not
+     * send the event and returns false. <b>The caller is then expected to put the event in a local queue instead.</b>
+     * 
+     * @param hashKey
+     *            the string used to map the value of a key to a specific partition.
+     * @param event
+     *            the event to be delivered to a Processing Element instance.
+     * @return true if the event was sent because the destination is <b>not</b> local.
+     * 
+     */
+    boolean checkAndSendIfNotLocal(String hashKey, Event event);
+
+    /**
+     * Send an event to all the remote partitions in the cluster. The caller is expected to also put the event in a
+     * local queue.
+     * 
+     * @param event
+     *            the event to be delivered to Processing Element instances.
+     */
+    void sendToAllRemotePartitions(Event event);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java
index 5829e0c..d068a66 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java
@@ -18,5 +18,8 @@
 
 /**
  * Defines some of the basic elements of the S4 platforms.
+ * 
+ * 
+ * 
  */
 package org.apache.s4.base;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-benchmarks/README.md
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/README.md b/subprojects/s4-benchmarks/README.md
index 9fa3f21..c5bea72 100644
--- a/subprojects/s4-benchmarks/README.md
+++ b/subprojects/s4-benchmarks/README.md
@@ -9,19 +9,19 @@ That said, let's look at what the benchmarking framework does and how to use it.
 
 ## Measurements
 
-The benchmarking framework consists of a multithreaded injector and an application. App nodes and injector are launched directly, there is no deployment step. This allows to skip the packaging and deployment steps and allows to easily add profiling parameters, but requires a source distribution and a shared file system.
+The benchmarking framework consists of a multithreaded injector and an application. App nodes and injector are launched directly, there is no deployment step. This allows to skip the packaging and deployment steps and to easily add profiling parameters, but requires a source distribution and a shared file system.
 
-The simplest application does nothing but count incoming keyed messages, on a single stream, but other simple application can be easily added. For instance, with multiple streams, and communicating PEs.
+The simplest application does nothing but count incoming keyed messages, but other simple application can be easily added, in particular, involving multiple communicating PEs. There are 2 input streams available: `inputStream` and `inputStream2`.
 
-The injector sends basic keyed messages. The outputstream of the injector uses a keyfinder to partition the events across the application nodes.
+The injector sends basic keyed messages to a given named stream. The outputstream of the injector uses a keyfinder to partition the events across the application nodes.
 
 We get metrics from the probes across the codebase, in particular:
 - the rate of events sent per second (in the injector)
 - the rate of events received per second (in the app nodes)
 
-Metrics from the platform code are computed with weighted moving averages.
+Metrics from the platform code are computed with weighted moving averages. It is recommended to let the application run for a few minutes and observe the metrics from the last minute.
 
-Profiling options can easily be added to the injector or app nodes in order to identify hotspots.
+Profiling options (e.g. YourKit) can easily be added to the injector or app nodes in order to identify hotspots.
 
 ## Parameters
 
@@ -38,9 +38,9 @@ Input parameters are:
 Exmample configuration files are available in `/config` and you can configure :
 
 - the number of keys
-- the number of warmup iterations
 - the number of test iterations
 - the number of parallel injection threads
+- the number of threads for the sender stage
 - etc…
 
 By default in this example the size of a message is 188 bytes.
@@ -53,15 +53,24 @@ Running 2 S4 nodes on the local machine:
 
 For a distributed setup, you should modify the host names in the above command line, and specify the correct Zookeeper connection string in `node.config`.
 
+Here is an example for driving the test on a cluster:
+`incubator-s4/subprojects/s4-benchmarks/bench-cluster.sh "processingHost1 processingHost2 processingHost3" "injectorConfigStream1.cfg injectorConfigStream2.cfg" node.cfg 2 "injectorHost1 injectorHost2 injectorHost3 injectorHost4"` (the `2` controls the number of injectors per stream per injector host)
+
 ## Results
 
 
-When the benchmark finishes, results are available in `measurements/injectors` for the injection rates and in `measurements/node[0-n]` for other statistics.
+When the benchmark finishes (and even during the execution), results are available in `measurements/injectors` for the injection rates and in `measurements/node[0-n]` for other statistics.
+
+Results are also available from the console output for each of the nodes.
 
 Most statistics files come from the probes of the platform and some of them use weighted moving averages. These are good for long running applications. For the benchmarks we also show instant rates, which are available in `injection-rate.csv` and `simplePE1.csv` files.
 
 ## Notes
 
-In the current design of S4, messages sent to output streams are not queued by S4 and directly passed to the communication layer.
-
-This implies that if the communication layer is not able to send those messages at least as fast as they are generated, the buffer of pending messages will increase rapidly. This may lead to memory problems in the injector. Solving the problem requires tuning the number of parallel injection threads.
+There are a lot of knobs for optimally configuring the stages, and the optimal settings will also depend upon:
+- the hardware
+- the network
+- the operating system (scheduling, networking)
+- the JVM implementation and tuning parameters
+- the application
+- the skewness and diversity of the data (there a maximum of 1 event executing in a given PE instance (i.e. keyed))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-benchmarks/bench-cluster.sh
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/bench-cluster.sh b/subprojects/s4-benchmarks/bench-cluster.sh
index 654f3c1..d2d4e36 100755
--- a/subprojects/s4-benchmarks/bench-cluster.sh
+++ b/subprojects/s4-benchmarks/bench-cluster.sh
@@ -2,9 +2,10 @@
 
 
 HOSTS=$1
-INJECTOR_CONFIG=$2
+INJECTOR_CONFIGS=$2 # 1 injector injects to 1 stream. Use more injector configs for injecting to more streams
 NODE_CONFIG=$3
-NB_INJECTORS=$4
+NB_INJECTORS_PER_NODE=$4
+INJECTOR_NODES=$5
 BENCH_ROOTDIR=`pwd`
 
 echo "hosts = $HOSTS"
@@ -12,6 +13,10 @@ echo "injector config file = $INJECTOR_CONFIG"
 echo "node config file = $NODE_CONFIG"
 echo "bench root dir = $BENCH_ROOTDIR"
 
+#########################################################
+#### cleanup files and processes, and build platform
+#########################################################
+
 killall -9 java
 
 cd $BENCH_ROOTDIR
@@ -25,8 +30,18 @@ NB_NODES=0
 for host in $HOSTS
 do
 	((NB_NODES++))
+	ssh $host "killall -9 java"
+done
+
+NB_INJECTORS=0
+for injectorNode in $INJECTOR_NODES ; do
+	for INJECTOR_CONFIG in $INJECTOR_CONFIGS ; do
+		NB_INJECTORS=$(($NB_INJECTORS + $NB_INJECTORS_PER_NODE))
+	done
+	ssh $injectorNode "killall -9 java"
 done
 
+# must run from where ZooKeeper server is running (as specified in injector config file)
 (cd $BENCH_ROOTDIR/../../ && ./s4 zkServer -clusters=c=testCluster1:flp=12000:nbTasks=$NB_INJECTORS,c=testCluster2:flp=13000:nbTasks=$NB_NODES &)
 
 
@@ -40,22 +55,38 @@ mkdir $BENCH
 echo "nb nodes = $NB_NODES\n" > $BENCH/benchConf.txt
 echo "hosts = $HOSTS" >> $BENCH/benchConf.txt
 echo "injector config ">> $BENCH/benchConf.txt
-cat $INJECTOR_CONFIG >> $BENCH/benchConf.txt
+for INJECTOR_CONFIG in $INJECTOR_CONFIGS ; do
+	cat $INJECTOR_CONFIG >> $BENCH/benchConf.txt
+done
+
 
+#########################################################
+#### start S4 nodes
+#########################################################
+
+i=0
 for host in $HOSTS
 do
+  ((i++))
   if [ $host == "localhost" ] || [ $host == "127.0.0.1" ] ; then
     $BENCH_ROOTDIR/startNode.sh $BENCH_ROOTDIR $NODE_CONFIG "localhost" > $BENCH_DIR/output_$i.log 2>$BENCH_DIR/s4err_$i.err < /dev/null &
   else
-    ssh $host "$BENCH_ROOTDIR/startNode.sh $BENCH_ROOTDIR $NODE_CONFIG $host > $BENCH_DIR/output_$host.log 2>$BENCH_DIR/s4err_$host	.err < /dev/null &"
+    ssh $host "$BENCH_ROOTDIR/startNode.sh $BENCH_ROOTDIR $NODE_CONFIG $host > $BENCH_DIR/output_$host-$i.log 2>$BENCH_DIR/s4err_$host-$i.err < /dev/null &"
   fi
 done
 
 sleep 15
 
-for ((i = 1; i <= $NB_INJECTORS; i++)); do
-	java -Xmx200m -Xms200m -verbose:gc -Xloggc:gc.log -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -cp `cat classpath.txt` org.apache.s4.core.Main "@$INJECTOR_CONFIG" &
-done
+PROFILING_OPTS=""
 
+#########################################################
+#### start injectors
+#########################################################
+
+for INJECTOR_NODE in $INJECTOR_NODES ; do
+	for INJECTOR_CONFIG in $INJECTOR_CONFIGS ; do
+		ssh $INJECTOR_NODE "cd $BENCH_ROOTDIR ; $BENCH_ROOTDIR/startInjector.sh $NB_INJECTORS_PER_NODE $INJECTOR_CONFIG $ZK_SERVER > $BENCH_DIR/out.injector_$INJECTOR_NODE.log 2>$BENCH_DIR/err.injector_$INJECTOR_NODE.log < /dev/null &"
+	done
+done
 
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-benchmarks/config/injector.config
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/config/injector.config b/subprojects/s4-benchmarks/config/injector.config
index c189b55..507e1e3 100644
--- a/subprojects/s4-benchmarks/config/injector.config
+++ b/subprojects/s4-benchmarks/config/injector.config
@@ -1,3 +1,3 @@
 -c=testCluster1
 -appClass=org.apache.s4.benchmark.simpleApp1.Injector
--p=s4.adapter.output.stream=inputStream,s4.benchmark.keysCount=2,s4.benchmark.warmupIterations=100000,s4.benchmark.testIterations=1000000,s4.benchmark.testSleepInterval=0,s4.benchmark.warmupSleepInterval=0,s4.benchmark.injector.parallelism=2
+-p=s4.sender.parallelism=4,s4.adapter.output.stream=inputStream,s4.benchmark.keysCount=100000,s4.benchmark.testIterations=1000000,s4.benchmark.testSleepInterval=0,s4.benchmark.injector.parallelism=2

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-benchmarks/config/node.config
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/config/node.config b/subprojects/s4-benchmarks/config/node.config
index a5b83b2..cb0e4c2 100644
--- a/subprojects/s4-benchmarks/config/node.config
+++ b/subprojects/s4-benchmarks/config/node.config
@@ -1,4 +1,3 @@
 -c=testCluster2
 -appClass=org.apache.s4.benchmark.simpleApp1.SimpleApp
--p=s4.adapter.output.stream=inputStream
 -zk=127.0.0.1:2181
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
index e2fd079..5463703 100644
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
@@ -4,7 +4,6 @@ import java.io.File;
 import java.math.BigDecimal;
 import java.math.MathContext;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -12,10 +11,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.s4.base.Event;
 import org.apache.s4.base.KeyFinder;
-import org.apache.s4.benchmark.utils.Utils;
-import org.apache.s4.comm.topology.ZNRecord;
-import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.core.RemoteStream;
 import org.apache.s4.core.adapter.AdapterApp;
 import org.slf4j.Logger;
@@ -84,22 +79,6 @@ public class Injector extends AdapterApp {
         };
         super.onInit();
         ConsoleReporter.enable(30, TimeUnit.SECONDS);
-        // Metrics.shutdown();
-        ZkClient zkClient = new ZkClient(zkString);
-        if (getReceiver().getPartition() == 0) {
-            zkClient.createPersistent("/benchmarkConfig");
-            zkClient.createPersistent("/benchmarkConfig/warmupIterations", warmupIterations * parallelism);
-            zkClient.createPersistent("/benchmarkConfig/testIterations", testIterations * parallelism);
-            zkClient.createPersistent("/warmup");
-            zkClient.createPersistent("/test");
-        }
-
-        if (!(getReceiver().getPartition() == 0)) {
-            zkClient.waitUntilExists("/test", TimeUnit.SECONDS, 10);
-        }
-        zkClient.createPersistent("/warmup/injector-" + getReceiver().getPartition());
-        zkClient.createPersistent("/test/injector-" + getReceiver().getPartition());
-        zkClient.close();
     }
 
     @Override
@@ -131,47 +110,13 @@ public class Injector extends AdapterApp {
             }
         });
 
-        CountDownLatch signalWarmupComplete = Utils.getReadySignal(zkString, "/warmup/injector-"
-                + getReceiver().getPartition(), keysCount);
-
         RemoteStream remoteStream = getRemoteStream();
-        generateEvents(remoteStream, warmupIterations, keysCount, warmupSleepInterval, parallelism);
-
-        generateStopEvent(remoteStream, -1, keysCount);
-
-        // now that we are certain app nodes are connected, check the target cluster
-        ZkClient zkClient = new ZkClient(zkString);
-        zkClient.setZkSerializer(new ZNRecordSerializer());
-
-        ZNRecord readData = zkClient.readData("/s4/streams/" + getRemoteStream().getName() + "/consumers/"
-                + zkClient.getChildren("/s4/streams/" + getRemoteStream().getName() + "/consumers").get(0));
-        String remoteClusterName = readData.getSimpleField("clusterName");
 
-        int appPartitionCount = zkClient.countChildren("/s4/clusters/" + remoteClusterName + "/tasks");
-        zkClient.close();
-        CountDownLatch signalBenchComplete = Utils.getReadySignal(zkString, "/test/injector-"
-                + getReceiver().getPartition(), appPartitionCount);
-
-        try {
-            signalWarmupComplete.await();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        logger.info("Warmup over with {} iterations over {} keys", warmupIterations, keysCount);
         counter.set(0);
 
         generateEvents(remoteStream, testIterations, keysCount, testSleepInterval, parallelism);
 
-        generateStopEvent(remoteStream, -2, appPartitionCount);
-        try {
-            // only need 1 message/partition. Upon reception, a znode is written and the node exits
-            signalBenchComplete.await();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-
-        logger.info("Tests completed after {} warmup and {} test events", warmupIterations * parallelism * keysCount,
-                testIterations * parallelism * keysCount);
+        logger.info("Tests completed after test events", testIterations * parallelism * keysCount);
 
         System.exit(0);
     }
@@ -192,25 +137,6 @@ public class Injector extends AdapterApp {
         }
     }
 
-    private void generateStopEvent(RemoteStream remoteStream, long stopKey, int keysCount) {
-
-        ExecutorService threadPool = Executors.newFixedThreadPool(1);
-        for (int j = 0; j < keysCount; j++) {
-            Event event = new Event();
-            event.put("key", Integer.class, j);
-            event.put("value", Long.class, stopKey);
-            event.put("injector", Integer.class, getReceiver().getPartition());
-            logger.info("Sending stop event with key {}", stopKey);
-            remoteStream.put(event);
-        }
-        threadPool.shutdown();
-        try {
-            threadPool.awaitTermination(1, TimeUnit.MINUTES);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-
     class InjectionTask implements Runnable {
 
         private long iterations;
@@ -232,7 +158,7 @@ public class Injector extends AdapterApp {
                     Event event = new Event();
                     event.put("key", int.class, j);
                     event.put("value", long.class, counter.incrementAndGet());
-                    event.put("injector", Integer.class, getReceiver().getPartition());
+                    event.put("injector", Integer.class, getReceiver().getPartitionId());
                     // logger.info("{}/{}/{}/",
                     // new String[] { Thread.currentThread().getName(), String.valueOf(i), String.valueOf(j),
                     // String.valueOf(event.get("value")) });

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
index e1570f9..00ac454 100644
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
@@ -13,7 +13,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 import com.yammer.metrics.reporting.ConsoleReporter;
-import com.yammer.metrics.reporting.CsvReporter;
 
 public class SimpleApp extends App {
 
@@ -29,34 +28,40 @@ public class SimpleApp extends App {
     @Override
     protected void onInit() {
         File logDirectory = new File(System.getProperty("user.dir") + "/measurements/node"
-                + getReceiver().getPartition());
+                + getReceiver().getPartitionId());
         if (!logDirectory.exists()) {
             if (!logDirectory.mkdirs()) {
                 throw new RuntimeException("Cannot create log dir " + logDirectory.getAbsolutePath());
             }
         }
-        CsvReporter.enable(logDirectory, 5, TimeUnit.SECONDS);
-        ConsoleReporter.enable(30, TimeUnit.SECONDS);
+        // CsvReporter.enable(logDirectory, 5, TimeUnit.SECONDS);
+        ConsoleReporter.enable(10, TimeUnit.SECONDS);
 
         SimplePE1 simplePE1 = createPE(SimplePE1.class, "simplePE1");
         ZkClient zkClient = new ZkClient(zkString);
         zkClient.waitUntilExists("/benchmarkConfig/warmupIterations", TimeUnit.SECONDS, 60);
-        Long warmupIterations = zkClient.readData("/benchmarkConfig/warmupIterations");
-        Long testIterations = zkClient.readData("/benchmarkConfig/testIterations");
 
         // TODO fix hardcoded cluster name (pass injector config?)
         int nbInjectors = zkClient.countChildren("/s4/clusters/testCluster1/tasks");
         simplePE1.setNbInjectors(nbInjectors);
 
-        simplePE1.setWarmupIterations(warmupIterations);
-        simplePE1.setTestIterations(testIterations);
         createInputStream("inputStream", new KeyFinder<Event>() {
 
             @Override
             public List<String> get(Event event) {
                 return ImmutableList.of(event.get("key"));
             }
-        }, simplePE1);
+        }, simplePE1).setParallelism(1);
+
+        SimplePE2 simplePE2 = createPE(SimplePE2.class, "simplePE2");
+
+        createInputStream("inputStream2", new KeyFinder<Event>() {
+
+            @Override
+            public List<String> get(Event event) {
+                return ImmutableList.of(event.get("key"));
+            }
+        }, simplePE2);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
index 20e4fc5..49f0868 100644
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
@@ -1,96 +1,34 @@
 package org.apache.s4.benchmark.simpleApp1;
 
-import java.math.BigDecimal;
-import java.math.MathContext;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.I0Itec.zkclient.ZkClient;
 import org.apache.s4.base.Event;
 import org.apache.s4.core.ProcessingElement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Gauge;
-
 public class SimplePE1 extends ProcessingElement {
 
     private static Logger logger = LoggerFactory.getLogger(SimplePE1.class);
 
-    private long warmupIterations = -1;
-    int warmedUp = 0;
-    int finished = 0;
-    private long testIterations = -1;
-    AtomicLong counter = new AtomicLong();
-    BigDecimal rate;
-    long lastTime = -1;
     int nbInjectors;
 
-    public void setWarmupIterations(long warmupIterations) {
-        this.warmupIterations = warmupIterations;
-    }
-
-    public void setTestIterations(long testIterations) {
-        this.testIterations = testIterations;
-    }
-
     public void setNbInjectors(int nbInjectors) {
         this.nbInjectors = nbInjectors;
     }
 
     public void onEvent(Event event) {
-        counter.incrementAndGet();
-
-        if (lastTime == -1) {
-            lastTime = System.currentTimeMillis();
-        } else {
-            if ((System.currentTimeMillis() - lastTime) > 1000) {
-                rate = new BigDecimal(counter.get()).divide(new BigDecimal(System.currentTimeMillis() - lastTime),
-                        MathContext.DECIMAL64).multiply(new BigDecimal(1000));
-
-                counter.set(0);
-                lastTime = System.currentTimeMillis();
-            }
-        }
 
         Long value = event.get("value", long.class);
-        // logger.info("reached value {}", value);
-        if (!(warmedUp == nbInjectors) && (value == -1)) {
-            logger.info("**** Warmed up for an injector");
-            addSequentialNode("/warmup/injector-" + event.get("injector", Integer.class));
-            warmedUp++;
-
-        } else if (!(finished == nbInjectors) && (value == (-2))) {
-            logger.info("******* finished an injector **************");
-            finished++;
-            addSequentialNode("/test/injector-" + event.get("injector", Integer.class));
-            if (finished == nbInjectors) {
-                System.exit(0);
-            }
-
-        }
-
-    }
-
-    private void addSequentialNode(String parent) {
-        ZkClient zkClient = new ZkClient(((SimpleApp) getApp()).getZkString());
-        zkClient.createPersistentSequential(parent + "/done", new byte[0]);
-        zkClient.close();
+        logger.trace(String.valueOf(value));
     }
 
     @Override
-    protected void onCreate() {
-        Metrics.newGauge(SimplePE1.class, "simplePE1", new Gauge<BigDecimal>() {
+    protected void onRemove() {
+        // TODO Auto-generated method stub
 
-            @Override
-            public BigDecimal value() {
-                return rate;
-            }
-        });
     }
 
     @Override
-    protected void onRemove() {
+    protected void onCreate() {
         // TODO Auto-generated method stub
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE2.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE2.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE2.java
new file mode 100644
index 0000000..96bff6e
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE2.java
@@ -0,0 +1,30 @@
+package org.apache.s4.benchmark.simpleApp1;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimplePE2 extends ProcessingElement {
+
+    private static Logger logger = LoggerFactory.getLogger(SimplePE2.class);
+
+    public void onEvent(Event event) {
+
+        Long value = event.get("value", long.class);
+        logger.trace(String.valueOf(value));
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-benchmarks/startInjector.sh
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/startInjector.sh b/subprojects/s4-benchmarks/startInjector.sh
new file mode 100755
index 0000000..ff22b68
--- /dev/null
+++ b/subprojects/s4-benchmarks/startInjector.sh
@@ -0,0 +1,19 @@
+#/bin/bash -x
+
+NB_INJECTORS_PER_NODE=$1
+INJECTOR_CONFIG=$2
+ZK_STRING=$3
+
+##############################################################################################
+#### start injectors, using the dumped classpath in order to avoid downloading the application
+##############################################################################################
+
+
+#killall -9 java
+PROFILING_OPTS=""
+
+host=`hostname`
+
+for ((i = 1; i <= $NB_INJECTORS_PER_NODE; i++)); do
+	java $PROFILING_OPTS -Xmx1G -Xms1G -verbose:gc -Xloggc:gc-injector-$host.log -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -cp `cat classpath.txt` org.apache.s4.core.Main "@$INJECTOR_CONFIG" &
+done

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-benchmarks/startNode.sh
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/startNode.sh b/subprojects/s4-benchmarks/startNode.sh
index ed9cf3d..c100646 100755
--- a/subprojects/s4-benchmarks/startNode.sh
+++ b/subprojects/s4-benchmarks/startNode.sh
@@ -1,20 +1,16 @@
 #!/bin/bash -x
 BENCH_ROOTDIR=$1
 NODE_CONFIG=$2
-host=$3
-
-if [ "$host" == "localhost" ] || [ "$host" == "127.0.0.1" ] ; then
-  echo "start on localhost"
-else
-  killall -9 java
-fi
 
 cd $BENCH_ROOTDIR
 
+############################################################################################
+#### start S4 node, using the dumped classpath in order to avoid downloading the application
+############################################################################################
 
 # you may add profiling to the application nodes using the correct options for your system
 PROFILING_OPTS=""
 
-java -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:gc-$host.log $PROFILING_OPTS -server -cp `cat classpath.txt` org.apache.s4.core.Main "@$NODE_CONFIG" &
+host=`hostname`
 
-# java -cp `cat classpath.txt` org.apache.s4.core.Main "@`pwd`/src/main/resources/injector.config"
\ No newline at end of file
+java -Xmx4G -Xms4G -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:gc-$host.log $PROFILING_OPTS -server -cp `cat classpath.txt` org.apache.s4.core.Main "@$NODE_CONFIG" &

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index c7c43a2..682bb39 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -115,6 +115,7 @@ public class DefaultCommModule extends AbstractModule {
             bind(Listener.class).to(
                     (Class<? extends Listener>) Class.forName(config.getString("s4.comm.listener.class")));
 
+            bind(DeserializerExecutorFactory.class).to(DefaultDeserializerExecutorFactory.class);
         } catch (ClassNotFoundException e) {
             logger.error("Cannot find class implementation ", e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultDeserializerExecutorFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultDeserializerExecutorFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultDeserializerExecutorFactory.java
new file mode 100644
index 0000000..bd359db
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultDeserializerExecutorFactory.java
@@ -0,0 +1,34 @@
+package org.apache.s4.comm;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Default executor factory implementation for deserialization stage.
+ * 
+ * 
+ * 
+ */
+public class DefaultDeserializerExecutorFactory implements DeserializerExecutorFactory {
+
+    @Named("s4.listener.maxMemoryPerChannel")
+    @Inject(optional = true)
+    int maxMemoryPerChannel = 100000;
+
+    @Named("s4.listener.maxMemoryPerExecutor")
+    @Inject(optional = true)
+    int maxMemoryPerExecutor = 100000;
+
+    @Override
+    public Executor create() {
+        // NOTE: these are suggested defaults but they might require application-specific tuning
+        return new OrderedMemoryAwareThreadPoolExecutor(1, maxMemoryPerChannel, maxMemoryPerExecutor, 60,
+                TimeUnit.SECONDS, new ThreadFactoryBuilder().setNameFormat("listener-deserializer-%d").build());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
new file mode 100644
index 0000000..701ed22
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
@@ -0,0 +1,13 @@
+package org.apache.s4.comm;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Factory for deserializer executors used in listener pipelines.
+ * 
+ */
+public interface DeserializerExecutorFactory {
+
+    Executor create();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ThrottlingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ThrottlingThreadPoolExecutorService.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ThrottlingThreadPoolExecutorService.java
new file mode 100644
index 0000000..45659ac
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ThrottlingThreadPoolExecutorService.java
@@ -0,0 +1,185 @@
+package org.apache.s4.comm;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This thread pool executor throttles the submission of new tasks by using a semaphore. Task submissions require
+ * permits, task completions release permits.
+ * 
+ * NOTE: you should either use the {@link ThrottlingThreadPoolExecutorService#submit(java.util.concurrent.Callable)}
+ * methods or the {@link ThrottlingThreadPoolExecutorService#execute(Runnable)} method.
+ * 
+ */
+public class ThrottlingThreadPoolExecutorService extends ForwardingListeningExecutorService {
+
+    private static Logger logger = LoggerFactory.getLogger(ThrottlingThreadPoolExecutorService.class);
+
+    int parallelism;
+    String streamName;
+    final ClassLoader classLoader;
+    int workQueueSize;
+    private BlockingQueue<Runnable> workQueue;
+    private Semaphore queueingPermits;
+    private ListeningExecutorService executorDelegatee;
+
+    /**
+     * 
+     * @param parallelism
+     *            Maximum number of threads in the pool
+     * @param fairParallelism
+     *            If true, in case of contention, waiting threads will be scheduled in a first-in first-out manner. This
+     *            can be help ensure ordering, though there is an associated performance cost (typically small).
+     * @param threadName
+     *            Naming scheme
+     * @param workQueueSize
+     *            Queue capacity
+     * @param classLoader
+     *            ClassLoader used as contextClassLoader for spawned threads
+     */
+    public ThrottlingThreadPoolExecutorService(int parallelism, boolean fairParallelism, String threadName,
+            int workQueueSize, final ClassLoader classLoader) {
+        super();
+        this.parallelism = parallelism;
+        this.streamName = threadName;
+        this.classLoader = classLoader;
+        this.workQueueSize = workQueueSize;
+        queueingPermits = new Semaphore(workQueueSize + parallelism, false);
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName)
+                .setThreadFactory(new ThreadFactory() {
+
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        Thread t = new Thread(r);
+                        t.setContextClassLoader(classLoader);
+                        return t;
+                    }
+                }).build();
+        // queueingPermits semaphore controls the size of the queue, thus no need to use a bounded queue
+        workQueue = new LinkedBlockingQueue<Runnable>(workQueueSize + parallelism);
+        ThreadPoolExecutor eventProcessingExecutor = new ThreadPoolExecutor(parallelism, parallelism, 60,
+                TimeUnit.SECONDS, workQueue, threadFactory, new RejectedExecutionHandler() {
+
+                    @Override
+                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                        // This is not expected to happen.
+                        logger.error("Could not submit task to executor {}", executor.toString());
+                    }
+                });
+        ((ThreadPoolExecutor) eventProcessingExecutor).allowCoreThreadTimeOut(true);
+        executorDelegatee = MoreExecutors.listeningDecorator(eventProcessingExecutor);
+
+    }
+
+    @Override
+    protected ListeningExecutorService delegate() {
+        return executorDelegatee;
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Callable<T> task) {
+        try {
+            queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            Thread.currentThread().interrupt();
+        }
+        ListenableFuture<T> future = super.submit(new CallableWithPermitRelease<T>(task));
+        return future;
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Runnable task, T result) {
+        try {
+            queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        ListenableFuture<T> future = super.submit(new RunnableWithPermitRelease(task), result);
+        return future;
+    }
+
+    @Override
+    public ListenableFuture<?> submit(Runnable task) {
+        try {
+            queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        ListenableFuture<?> future = super.submit(new RunnableWithPermitRelease(task));
+        return future;
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        try {
+            queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            Thread.currentThread().interrupt();
+        }
+        super.execute(new RunnableWithPermitRelease(command));
+    }
+
+    /**
+     * Releases a permit after the task is executed
+     * 
+     */
+    class RunnableWithPermitRelease implements Runnable {
+
+        Runnable delegatee;
+
+        public RunnableWithPermitRelease(Runnable delegatee) {
+            this.delegatee = delegatee;
+        }
+
+        @Override
+        public void run() {
+            try {
+                delegatee.run();
+            } finally {
+                queueingPermits.release();
+            }
+
+        }
+    }
+
+    /**
+     * Releases a permit after the task is completed
+     * 
+     */
+    class CallableWithPermitRelease<T> implements Callable<T> {
+
+        Callable<T> delegatee;
+
+        public CallableWithPermitRelease(Callable<T> delegatee) {
+            this.delegatee = delegatee;
+        }
+
+        @Override
+        public T call() throws Exception {
+            try {
+                return delegatee.call();
+            } finally {
+                queueingPermits.release();
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
index bff7b4b..3707273 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
@@ -29,7 +29,7 @@ import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
 /**
- * Serializer/deserializer based on <a href="http://code.google.com/p/kryo/">kryo</a>
+ * Serializer/deserializer based on <a href="http://code.google.com/p/kryo/">kryo 2</a>
  * 
  */
 public class KryoSerDeser implements SerializerDeserializer {
@@ -89,7 +89,6 @@ public class KryoSerDeser implements SerializerDeserializer {
         Output output = outputThreadLocal.get();
         try {
             kryoThreadLocal.get().writeClassAndObject(output, message);
-
             return ByteBuffer.wrap(output.toBytes());
         } finally {
             output.clear();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index b6779a0..d09f2b6 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -180,7 +180,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
             logger.warn("Could not find channel for partition {}", partitionId);
             return;
         }
-
         c.write(buffer).addListener(new MessageSendingListener(partitionId));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index b2700e5..35b1a52 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -19,12 +19,11 @@
 package org.apache.s4.comm.tcp;
 
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
 
 import org.apache.s4.base.Listener;
+import org.apache.s4.base.Receiver;
+import org.apache.s4.comm.DeserializerExecutorFactory;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.jboss.netty.bootstrap.ServerBootstrap;
@@ -45,6 +44,7 @@ import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.execution.ExecutionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,14 +57,14 @@ import com.google.inject.name.Named;
  */
 public class TCPListener implements Listener {
     private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
-    private BlockingQueue<ChannelBuffer> handoffQueue = new SynchronousQueue<ChannelBuffer>();
     private ClusterNode node;
     private ServerBootstrap bootstrap;
     private final ChannelGroup channels = new DefaultChannelGroup();
     private final int nettyTimeout;
 
     @Inject
-    public TCPListener(Assignment assignment, @Named("s4.comm.timeout") int timeout) {
+    public TCPListener(Assignment assignment, @Named("s4.comm.timeout") int timeout, final Receiver receiver,
+            final DeserializerExecutorFactory deserializerExecutorFactory) {
         // wait for an assignment
         node = assignment.assignClusterNode();
         nettyTimeout = timeout;
@@ -77,8 +77,9 @@ public class TCPListener implements Listener {
         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
             public ChannelPipeline getPipeline() {
                 ChannelPipeline p = Channels.pipeline();
-                p.addLast("1", new LengthFieldBasedFrameDecoder(999999, 0, 4, 0, 4));
-                p.addLast("2", new ChannelHandler(handoffQueue));
+                p.addLast("decoder", new LengthFieldBasedFrameDecoder(999999, 0, 4, 0, 4));
+                p.addLast("executionhandler", new ExecutionHandler(deserializerExecutorFactory.create()));
+                p.addLast("receiver", new ChannelHandler(receiver));
 
                 return p;
             }
@@ -94,15 +95,6 @@ public class TCPListener implements Listener {
         channels.add(c);
     }
 
-    public ByteBuffer recv() {
-        try {
-            return handoffQueue.take().toByteBuffer();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            return null;
-        }
-    }
-
     @Override
     public int getPartitionId() {
         return node.getPartition();
@@ -118,21 +110,16 @@ public class TCPListener implements Listener {
     }
 
     public class ChannelHandler extends SimpleChannelHandler {
-        private BlockingQueue<ChannelBuffer> handoffQueue;
+        private final Receiver receiver;
 
-        public ChannelHandler(BlockingQueue<ChannelBuffer> handOffQueue) {
-            this.handoffQueue = handOffQueue;
+        public ChannelHandler(Receiver receiver) {
+            this.receiver = receiver;
         }
 
         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
             channels.add(e.getChannel());
             ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
-            try {
-                handoffQueue.put(buffer); // this holds up the Netty upstream I/O thread if
-                                          // there's no receiver at the other end of the handoff queue
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            }
+            receiver.receive(buffer.toByteBuffer());
         }
 
         public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
@@ -141,17 +128,17 @@ public class TCPListener implements Listener {
             c.close().addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
-                    if (future.isSuccess())
+                    if (future.isSuccess()) {
                         channels.remove(future.getChannel());
-                    else
+                    } else {
                         logger.error("FAILED to close channel");
+                    }
                 }
             });
         }
 
         @Override
         public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-            // TODO Auto-generated method stub
             super.channelClosed(ctx, e);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
index d7f6275..e719cbc 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
@@ -26,6 +26,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 
 import org.apache.s4.base.Listener;
+import org.apache.s4.base.Receiver;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -46,13 +47,15 @@ public class UDPListener implements Listener, Runnable {
     static int BUFFER_LENGTH = 65507;
     private BlockingQueue<ByteBuffer> handoffQueue = new SynchronousQueue<ByteBuffer>();
     private ClusterNode node;
+    private Receiver receiver;
 
     @Inject
-    public UDPListener(Assignment assignment) {
-        this(assignment, -1);
+    public UDPListener(Assignment assignment, final Receiver receiver) {
+        this(assignment, -1, receiver);
     }
 
-    public UDPListener(Assignment assignment, int UDPBufferSize) {
+    public UDPListener(Assignment assignment, int UDPBufferSize, final Receiver receiver) {
+        this.receiver = receiver;
         // wait for an assignment
         node = assignment.assignClusterNode();
 
@@ -80,11 +83,12 @@ public class UDPListener implements Listener, Runnable {
                 ChannelBuffer copiedBuffer = ChannelBuffers.copiedBuffer(datagram.getData(), datagram.getOffset(),
                         datagram.getLength());
                 datagram.setLength(BUFFER_LENGTH);
-                try {
-                    handoffQueue.put(copiedBuffer.toByteBuffer());
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                }
+                receiver.receive(copiedBuffer.toByteBuffer());
+                // try {
+                // handoffQueue.put(copiedBuffer.toByteBuffer());
+                // } catch (InterruptedException ie) {
+                // Thread.currentThread().interrupt();
+                // }
             }
         } catch (IOException e) {
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
index 36417b4..33d75e7 100644
--- a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
+++ b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
@@ -5,4 +5,11 @@ s4.comm.listener.class=org.apache.s4.comm.tcp.TCPListener
 s4.comm.timeout=1000
 s4.cluster.zk_address = localhost:2181
 s4.cluster.zk_session_timeout = 10000
-s4.cluster.zk_connection_timeout = 10000
\ No newline at end of file
+s4.cluster.zk_connection_timeout = 10000
+
+# how many threads to use for the sender stage (i.e. serialization)
+s4.sender.parallelism=1
+# maximum number of events in the buffer of the sender stage
+s4.sender.workQueueSize=10000
+# maximum number of events in the buffer of the processing stage
+s4.stream.workQueueSize=10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
deleted file mode 100644
index 22b68a0..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.comm;
-
-import java.nio.ByteBuffer;
-
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Listener;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/*
- * Test util for communication protocols.
- *
- * <ul>
- * <li> The util defines Send and Receive Threads </li>
- * <li> SendThread sends out a pre-defined number of messages to all the partitions </li>
- * <li> ReceiveThread receives all/most of these messages </li>
- * <li> To avoid the receiveThread waiting for ever, it spawns a TimerThread that would
- * interrupt after a pre-defined but long enough interval </li>
- * </ul>
- *
- */
-public class DeliveryTestUtil {
-
-    private final Emitter emitter;
-    private final Listener listener;
-    private final int interval;
-    private int numMessages;
-    private int sleepCount;
-
-    // public Thread sendThread, receiveThread;
-    private final int messagesExpected;
-
-    @Inject
-    public DeliveryTestUtil(Emitter emitter, Listener listener, @Named("s4.emitter.send.interval") int interval,
-            @Named("s4.emitter.send.numMessages") int numMessages, @Named("s4.listener.recv.sleepCount") int sleepCount) {
-        this.emitter = emitter;
-        this.listener = listener;
-        this.interval = interval;
-        this.numMessages = numMessages;
-        this.sleepCount = sleepCount;
-        this.messagesExpected = numMessages * this.emitter.getPartitionCount();
-
-        // this.sendThread = new SendThread();
-        // this.receiveThread = new ReceiveThread();
-    }
-
-    public class SendThread extends Thread {
-        @Override
-        public void run() {
-            try {
-                for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
-                    for (int i = 0; i < numMessages; i++) {
-                        ByteBuffer message = ChannelBuffers.wrappedBuffer((new String("message-" + i)).getBytes())
-                                .toByteBuffer();
-                        emitter.send(partition, message);
-                        Thread.sleep(interval);
-                    }
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-                return;
-            }
-        }
-    }
-
-    /*
-     * TimerThread - interrupts the passed thread, after specified time-interval.
-     * 
-     * FIXME we should use number of events rather than time-based interval
-     */
-    class TimerThread extends Thread {
-        private final Thread watchThread;
-        private volatile int sleepCounter;
-
-        TimerThread(Thread watchThread) {
-            this.watchThread = watchThread;
-            this.sleepCounter = sleepCount;
-        }
-
-        public void resetSleepCounter() {
-            this.sleepCounter = sleepCount;
-        }
-
-        public void clearSleepCounter() {
-            this.sleepCounter = 0;
-        }
-
-        private int getCounter() {
-            return sleepCounter--;
-
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (getCounter() > 0) {
-                    Thread.sleep(interval);
-                }
-                watchThread.interrupt();
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    class ReceiveThread extends Thread {
-        private int messagesReceived = 0;
-
-        @Override
-        public void run() {
-
-            // start the timer thread to interrupt if blocked for too long
-            TimerThread timer = new TimerThread(this);
-            timer.start();
-            while (messagesReceived < messagesExpected) {
-                ByteBuffer message = listener.recv();
-                timer.resetSleepCounter();
-                if (message != null)
-                    messagesReceived++;
-                else
-                    break;
-            }
-            timer.clearSleepCounter();
-        }
-
-        private boolean moreMessages() {
-            return ((messagesExpected - messagesReceived) > 0);
-        }
-    }
-
-    public Thread newSendThread() {
-        return new SendThread();
-    }
-
-    public Thread newReceiveThread() {
-        return new ReceiveThread();
-    }
-
-    public boolean moreMessages(Thread recvThread) {
-        return ((ReceiveThread) recvThread).moreMessages();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
deleted file mode 100644
index 1813f7a..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.comm.tcp;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MultiPartitionDeliveryTest extends TCPCommTest {
-
-    private static Logger logger = LoggerFactory.getLogger(MultiPartitionDeliveryTest.class);
-
-    public MultiPartitionDeliveryTest() throws IOException {
-        super(6);
-        logger = LoggerFactory.getLogger(MultiPartitionDeliveryTest.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
deleted file mode 100644
index e46ed47..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.comm.tcp;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SimpleDeliveryTest extends TCPCommTest {
-
-    private static Logger logger = LoggerFactory.getLogger(SimpleDeliveryTest.class);
-
-    public SimpleDeliveryTest() throws IOException {
-        super();
-        logger = LoggerFactory.getLogger(SimpleDeliveryTest.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
new file mode 100644
index 0000000..741524a
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
@@ -0,0 +1,49 @@
+package org.apache.s4.comm.tcp;
+
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.MockReceiverModule;
+import org.apache.s4.fixtures.NoOpReceiverModule;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.junit.Test;
+
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TCPBasicTest extends ZkBasedTest {
+
+    @Test
+    public void testSingleMessage() {
+
+        try {
+            Injector injector1 = Guice
+                    .createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
+                            .openStream(), "cluster1"), new NoOpReceiverModule());
+            Emitter emitter = injector1.getInstance(Emitter.class);
+
+            Injector injector2 = Guice
+                    .createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
+                            .openStream(), "cluster1"), new MockReceiverModule());
+            // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
+            // listener, here a mock which simply intercepts the message and notifies through a countdown latch)
+            injector2.getInstance(Listener.class);
+
+            emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
+
+            // check receiver got the message
+            Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
deleted file mode 100644
index c9153ce..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.comm.tcp;
-
-import java.io.IOException;
-
-import org.apache.s4.comm.DefaultCommModule;
-import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.comm.util.ProtocolTestUtil;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.io.Resources;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-public abstract class TCPCommTest extends ProtocolTestUtil {
-
-    private static Logger logger = LoggerFactory.getLogger(TCPCommTest.class);
-    DeliveryTestUtil util;
-    public final static String CLUSTER_NAME = "cluster1";
-
-    public TCPCommTest() throws IOException {
-        super();
-    }
-
-    public TCPCommTest(int numTasks) throws IOException {
-        super(numTasks);
-    }
-
-    public Injector newInjector() {
-        try {
-            return Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
-                    .openStream(), CLUSTER_NAME));
-        } catch (IOException e) {
-            Assert.fail();
-            return null;
-        }
-    }
-
-    @Override
-    public void testDelivery() throws InterruptedException {
-        startThreads();
-        waitForThreads();
-
-        Assert.assertTrue("Message Delivery", messageDelivery());
-
-        logger.info("Message ordering - " + messageOrdering());
-        Assert.assertTrue("Pairwise message ordering", messageOrdering());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
index e3ff947..8b1ff9a 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.comm.tools.TaskSetup;
 import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
 import org.junit.Test;
 
 import com.google.common.base.Splitter;
@@ -36,7 +37,7 @@ import com.google.common.collect.Sets;
  * in that other class.
  * 
  */
-public class AssignmentsFromZKTest1 extends ZKBaseTest {
+public class AssignmentsFromZKTest1 extends ZkBasedTest {
 
     @Test
     public void testAssignmentFor1Cluster() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
index 768ffd0..f17bf26 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
@@ -26,13 +26,14 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.comm.tools.TaskSetup;
 import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.base.Splitter;
 import com.google.common.collect.Sets;
 
-public class ClustersFromZKTest extends ZKBaseTest {
+public class ClustersFromZKTest extends ZkBasedTest {
 
     @Test
     @Ignore

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
deleted file mode 100644
index 2ef95f4..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.comm.topology;
-
-import java.io.IOException;
-
-import org.apache.s4.fixtures.CommTestUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.server.NIOServerCnxn.Factory;
-import org.junit.After;
-import org.junit.Before;
-
-public class ZKBaseTest {
-
-    private Factory zkFactory;
-
-    @Before
-    public void setUp() throws IOException, InterruptedException, KeeperException {
-        CommTestUtils.cleanupTmpDirs();
-        zkFactory = CommTestUtils.startZookeeperServer();
-
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        CommTestUtils.stopZookeeperServer(zkFactory);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
deleted file mode 100644
index 4e06233..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.comm.udp;
-
-import java.io.IOException;
-
-public class MultiPartitionDeliveryTest extends UDPCommTest {
-    public MultiPartitionDeliveryTest() throws IOException {
-        super(2);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
deleted file mode 100644
index b1c5d8a..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.comm.udp;
-
-import java.io.IOException;
-
-public class SimpleDeliveryTest extends UDPCommTest {
-    public SimpleDeliveryTest() throws IOException {
-        super();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
new file mode 100644
index 0000000..54e6347
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
@@ -0,0 +1,48 @@
+package org.apache.s4.comm.udp;
+
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.MockReceiverModule;
+import org.apache.s4.fixtures.NoOpReceiverModule;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.junit.Test;
+
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class UDPBasicTest extends ZkBasedTest {
+
+    @Test
+    public void testSingleMessage() {
+
+        try {
+            Injector injector1 = Guice.createInjector(
+                    new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(), "cluster1"),
+                    new NoOpReceiverModule());
+            Emitter emitter = injector1.getInstance(Emitter.class);
+
+            Injector injector2 = Guice.createInjector(
+                    new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(), "cluster1"),
+                    new MockReceiverModule());
+            // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
+            // listener)
+            injector2.getInstance(Listener.class);
+
+            emitter.send(0, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
+
+            Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
deleted file mode 100644
index 0f6004f..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.comm.udp;
-
-import java.io.IOException;
-
-import org.apache.s4.comm.DefaultCommModule;
-import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.comm.util.ProtocolTestUtil;
-import org.junit.Assert;
-
-import com.google.common.io.Resources;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.name.Names;
-
-public abstract class UDPCommTest extends ProtocolTestUtil {
-    DeliveryTestUtil util;
-
-    public UDPCommTest() throws IOException {
-        super();
-    }
-
-    public UDPCommTest(int numTasks) throws IOException {
-        super(numTasks);
-    }
-
-    @Override
-    protected Injector newInjector() throws IOException {
-        return Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(),
-                "cluster1"), new UDPCommTestModule());
-    }
-
-    class UDPCommTestModule extends AbstractModule {
-        UDPCommTestModule() {
-        }
-
-        @Override
-        protected void configure() {
-            bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
-            bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
-            bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
-        }
-    }
-
-    /**
-     * Tests the protocol. If all components function without throwing exceptions, the test passes.
-     * 
-     * @throws InterruptedException
-     */
-    @Override
-    public void testDelivery() {
-        try {
-            Thread.sleep(1000);
-            startThreads();
-            waitForThreads();
-            Assert.assertTrue("Message Delivery", messageDelivery());
-        } catch (Exception e) {
-            Assert.fail("UDP DeliveryTest");
-        }
-    }
-}