You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/12/03 18:31:49 UTC

git commit: Improve throttling and benchmark apps - limit number of pending messages for TCP channel - added benchmark app with internal communications

Updated Branches:
  refs/heads/S4-95 cc2c4e888 -> 8117de6df


Improve throttling and benchmark apps
- limit number of pending messages for TCP channel
- added benchmark app with internal communications


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/8117de6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/8117de6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/8117de6d

Branch: refs/heads/S4-95
Commit: 8117de6df34eca0014af39b9e1f46d264227870a
Parents: cc2c4e8
Author: Matthieu Morel <mm...@apache.org>
Authored: Sun Dec 2 22:38:42 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Mon Dec 3 19:31:05 2012 +0100

----------------------------------------------------------------------
 build.gradle                                       |    5 +-
 subprojects/s4-benchmarks/README.md                |   15 +-
 subprojects/s4-benchmarks/config/injector.config   |    4 +-
 subprojects/s4-benchmarks/config/node.config       |    2 +-
 subprojects/s4-benchmarks/s4-benchmarks.gradle     |    2 +-
 .../java/org/apache/s4/benchmark/dag/DagApp.java   |  109 ++++++++
 .../java/org/apache/s4/benchmark/dag/FirstPE.java  |   42 +++
 .../java/org/apache/s4/benchmark/dag/LastPE.java   |   30 ++
 .../java/org/apache/s4/benchmark/dag/PipePE.java   |   42 +++
 .../s4/benchmark/prodcon/ProducerConsumerApp.java  |   79 ++++++
 .../org/apache/s4/benchmark/prodcon/SimplePE1.java |   36 +++
 .../org/apache/s4/benchmark/prodcon/SimplePE2.java |   30 ++
 .../apache/s4/benchmark/simpleApp1/Injector.java   |  176 ------------
 .../apache/s4/benchmark/simpleApp1/SimpleApp.java  |   79 ------
 .../apache/s4/benchmark/simpleApp1/SimplePE1.java  |   36 ---
 .../apache/s4/benchmark/simpleApp1/SimplePE2.java  |   30 --
 .../org/apache/s4/benchmark/utils/Injector.java    |  174 ++++++++++++
 .../java/org/apache/s4/comm/DefaultCommModule.java |    3 +-
 .../comm/DefaultDeserializerExecutorFactory.java   |   34 ---
 .../s4/comm/DeserializerExecutorFactory.java       |   10 +
 .../comm/ThrottlingThreadPoolExecutorService.java  |  185 -------------
 .../MemoryAwareDeserializerExecutorFactory.java    |   44 +++
 ...haredThrottlingDeserializerExecutorFactory.java |   36 +++
 .../ThrottlingThreadPoolExecutorService.java       |  212 +++++++++++++++
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |   39 +++-
 .../java/org/apache/s4/comm/tcp/TCPListener.java   |   22 ++-
 .../org/apache/s4/comm/tcp/TCPRemoteEmitter.java   |   10 +-
 .../src/main/resources/default.s4.comm.properties  |    4 +
 .../main/java/org/apache/s4/core/SenderImpl.java   |    1 +
 .../DefaultSenderExecutorServiceFactory.java       |    6 +-
 ...aultStreamProcessingExecutorServiceFactory.java |    2 +-
 .../java/org/apache/s4/core/util/S4Metrics.java    |   36 ++-
 .../org/apache/s4/fixtures/MockCoreModule.java     |    4 +-
 33 files changed, 967 insertions(+), 572 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 89113f2..e1b981d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -62,7 +62,7 @@ project.ext["libraries"] = [
     minlog:             'com.googlecode:minlog:1.2',
     // NOTE shaded jar is not resolved correctly, we include it in /lib directory
     reflectasm:         'com.esotericsoftware.reflectasm:reflectasm:1.07-shaded',
-    netty:              'io.netty:netty:3.5.9.Final',
+    netty:              'io.netty:netty:3.5.11.Final',
     mockito_core:       'org.mockito:mockito-core:1.9.0',
     commons_config:     'commons-configuration:commons-configuration:1.6',
     commons_codec:      'commons-codec:commons-codec:1.4',
@@ -76,7 +76,7 @@ project.ext["libraries"] = [
     log4j:              'log4j:log4j:1.2.15',
     logback_core:       'ch.qos.logback:logback-core:0.9.29',
     logback_classic:    'ch.qos.logback:logback-classic:0.9.29',
-    zk:                 'org.apache.zookeeper:zookeeper:3.3.1',
+    zk:                 'org.apache.zookeeper:zookeeper:3.3.3',
     jcip:               'net.jcip:jcip-annotations:1.0',
     junit:              'junit:junit:4.10',
     zkclient:           'com.github.sgroschupf:zkclient:0.1',
@@ -108,6 +108,7 @@ subprojects {
         exclude group: 'com.sun.jdmk', module: 'jmxtools'
         exclude group: 'com.sun.jmx', module: 'jmxri'
         exclude group: 'javax.jms', module: 'jms'
+        exclude group: 'javax.mail', module: 'mail'
     }
 
     dependencies {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/README.md
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/README.md b/subprojects/s4-benchmarks/README.md
index c5bea72..f8528fb 100644
--- a/subprojects/s4-benchmarks/README.md
+++ b/subprojects/s4-benchmarks/README.md
@@ -3,7 +3,7 @@ Simple S4 Benchmarking Framework
 
 This framework is intended primarily to identify hotspots in S4 platform code easily and to evaluate the impact of refactorings or new features. 
 
-> The numbers it produces are only useful in comparison with a baseline from other measurements from the same benchmark and do not represent absolute performance numbers. For that, one should use a full-fledged load injection framework or measure the performance of a live application.
+> The numbers it produces are mainly useful in comparison with a baseline from other measurements from the same benchmark and do not represent absolute performance numbers. For that, one should use a full-fledged load injection framework or measure the performance of a live application.
 
 That said, let's look at what the benchmarking framework does and how to use it.
 
@@ -11,13 +11,19 @@ That said, let's look at what the benchmarking framework does and how to use it.
 
 The benchmarking framework consists of a multithreaded injector and an application. App nodes and injector are launched directly, there is no deployment step. This allows to skip the packaging and deployment steps and to easily add profiling parameters, but requires a source distribution and a shared file system.
 
-The simplest application does nothing but count incoming keyed messages, but other simple application can be easily added, in particular, involving multiple communicating PEs. There are 2 input streams available: `inputStream` and `inputStream2`.
+2 simple applications are provided:
+
+* A producer consumer application between 2 different logical clusters. It measures the overhead of the communication layer. There are 2 available external input streams: `inputStream` and `inputStream2`. You may inject on one or both of these independent streams. (A node will process more events overall if it gets them from more parallel sources, unless it reaches network or cpu boundaries). 
+* A pipeline of processing elements, that mixes external and internal event communication
+
+There is almost no processing involved in the PE themselves, other than delegating to the next processing element in the pipeline, if any.
 
 The injector sends basic keyed messages to a given named stream. The outputstream of the injector uses a keyfinder to partition the events across the application nodes.
 
 We get metrics from the probes across the codebase, in particular:
 - the rate of events sent per second (in the injector)
 - the rate of events received per second (in the app nodes)
+- other metrics about the number of dequeue messages per stream, ratio between local and remote events etc...
 
 Metrics from the platform code are computed with weighted moving averages. It is recommended to let the application run for a few minutes and observe the metrics from the last minute.
 
@@ -27,6 +33,9 @@ Profiling options (e.g. YourKit) can easily be added to the injector or app node
 
 We provide a script for that purpose: `bench-cluster.sh`
 
+You can use arbitrary numbers of injectors and processing nodes, in order to vary the load and the number of concurrent connections.
+
+
 Input parameters are:
 
 - host names on which to start S4 nodes
@@ -41,6 +50,8 @@ Exmample configuration files are available in `/config` and you can configure :
 - the number of test iterations
 - the number of parallel injection threads
 - the number of threads for the sender stage
+- the number of events between making a pause in the injection
+- the duration of the pause (can be 0)
 - etc…
 
 By default in this example the size of a message is 188 bytes.

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/config/injector.config
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/config/injector.config b/subprojects/s4-benchmarks/config/injector.config
index 507e1e3..e02587c 100644
--- a/subprojects/s4-benchmarks/config/injector.config
+++ b/subprojects/s4-benchmarks/config/injector.config
@@ -1,3 +1,3 @@
 -c=testCluster1
--appClass=org.apache.s4.benchmark.simpleApp1.Injector
--p=s4.sender.parallelism=4,s4.adapter.output.stream=inputStream,s4.benchmark.keysCount=100000,s4.benchmark.testIterations=1000000,s4.benchmark.testSleepInterval=0,s4.benchmark.injector.parallelism=2
+-appClass=org.apache.s4.benchmark.utils.Injector
+-p=s4.sender.parallelism=4,s4.adapter.output.stream=inputStream,s4.benchmark.keysCount=10000,s4.benchmark.testIterations=1000000,s4.benchmark.injector.iterationsBeforePause=1000,s4.benchmark.pauseTimeMs=20,s4.benchmark.injector.parallelism=2

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/config/node.config
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/config/node.config b/subprojects/s4-benchmarks/config/node.config
index cb0e4c2..28dfdff 100644
--- a/subprojects/s4-benchmarks/config/node.config
+++ b/subprojects/s4-benchmarks/config/node.config
@@ -1,3 +1,3 @@
 -c=testCluster2
--appClass=org.apache.s4.benchmark.simpleApp1.SimpleApp
+-appClass=org.apache.s4.benchmark.prodcon.ProducerConsumerApp
 -zk=127.0.0.1:2181
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/s4-benchmarks.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/s4-benchmarks.gradle b/subprojects/s4-benchmarks/s4-benchmarks.gradle
index 2fc10bc..71b2823 100644
--- a/subprojects/s4-benchmarks/s4-benchmarks.gradle
+++ b/subprojects/s4-benchmarks/s4-benchmarks.gradle
@@ -78,5 +78,5 @@ task cp << {
     description='Dumps the classpath for running a class from this project, into a \'classpath.txt\' file in the current directory'
     String rt = ""
     configurations.runtime.files.each{File file -> rt+=(file.path+File.pathSeparator) }
-    new File("classpath.txt").write(sourceSets.main.output.classesDir.path + File.pathSeparator + rt + File.pathSeparator + '/Users/matthieu/apache-s4-0.5.0-incubating-bin' + '/subprojects/s4-tools/build/install/s4-tools/lib/*')
+    new File("classpath.txt").write(sourceSets.main.output.classesDir.path + File.pathSeparator + rt)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/DagApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/DagApp.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/DagApp.java
new file mode 100644
index 0000000..c8f463d
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/DagApp.java
@@ -0,0 +1,109 @@
+package org.apache.s4.benchmark.dag;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import com.yammer.metrics.reporting.ConsoleReporter;
+import com.yammer.metrics.reporting.CsvReporter;
+
+public class DagApp extends App {
+
+    @Inject
+    @Named("s4.cluster.zk_address")
+    String zkString;
+
+    @Override
+    protected void onStart() {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    protected void onInit() {
+        File logDirectory = new File(System.getProperty("user.dir") + "/measurements/node"
+                + getReceiver().getPartitionId());
+        if (!logDirectory.exists()) {
+            if (!logDirectory.mkdirs()) {
+                throw new RuntimeException("Cannot create log dir " + logDirectory.getAbsolutePath());
+            }
+        }
+        CsvReporter.enable(logDirectory, 10, TimeUnit.SECONDS);
+        ConsoleReporter.enable(10, TimeUnit.SECONDS);
+
+        FirstPE inputPE = createPE(FirstPE.class, "firstPE");
+        ZkClient zkClient = new ZkClient(zkString);
+        zkClient.waitUntilExists("/benchmarkConfig/warmupIterations", TimeUnit.SECONDS, 60);
+
+        // TODO fix hardcoded cluster name (pass injector config?)
+        int nbInjectors = zkClient.countChildren("/s4/clusters/testCluster1/tasks");
+
+        createInputStream("inputStream", new KeyFinder<Event>() {
+
+            @Override
+            public List<String> get(Event event) {
+                return ImmutableList.of(String.valueOf(event.get("key")));
+            }
+        }, inputPE).setParallelism(1);
+
+        PipePE pe1 = createPE(PipePE.class, "pe1");
+
+        Stream<Event> pipe = createStream("firstPE->pe1", new KeyFinder<Event>() {
+
+            @Override
+            public List<String> get(Event event) {
+                return ImmutableList.of(String.valueOf(event.get("key")));
+            }
+        }, pe1).setParallelism(1);
+
+        inputPE.setDownstream(pipe);
+
+        PipePE pe2 = addPipePE(pe1, "pe1", "pe2");
+        PipePE pe3 = addPipePE(pe2, "pe2", "pe3");
+        PipePE pe4 = addPipePE(pe3, "pe3", "pe4");
+        PipePE pe5 = addPipePE(pe4, "pe4", "pe5");
+        PipePE pe6 = addPipePE(pe5, "pe5", "pe6");
+        PipePE pe7 = addPipePE(pe6, "pe6", "pe7");
+        PipePE pe8 = addPipePE(pe7, "pe7", "pe8");
+        PipePE pe9 = addPipePE(pe8, "pe8", "pe9");
+
+        LastPE endPE = createPE(LastPE.class, "endPE");
+        Stream<Event> endStream = createStream("pe9->endPE", endPE);
+        pe9.setDownstream(endStream);
+
+    }
+
+    private PipePE addPipePE(PipePE upstreamPE, String upstreamPEName, String peName) {
+        PipePE pe = createPE(PipePE.class, peName);
+
+        Stream<Event> pipe = createStream(upstreamPEName + "->" + peName, new KeyFinder<Event>() {
+
+            @Override
+            public List<String> get(Event event) {
+                return ImmutableList.of(String.valueOf(event.get("key")));
+            }
+        }, pe).setParallelism(1);
+
+        upstreamPE.setDownstream(pipe);
+        return pe;
+    }
+
+    @Override
+    protected void onClose() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public String getZkString() {
+        return zkString;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/FirstPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/FirstPE.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/FirstPE.java
new file mode 100644
index 0000000..b70e7b7
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/FirstPE.java
@@ -0,0 +1,42 @@
+package org.apache.s4.benchmark.dag;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FirstPE extends ProcessingElement {
+
+    private static Logger logger = LoggerFactory.getLogger(FirstPE.class);
+
+    private Stream<Event> downstream;
+
+    public void onEvent(Event event) {
+
+        Long value = event.get("value", long.class);
+        logger.trace("FirstPE : {} -> {}", getId(), value);
+        Event outputEvent = new Event();
+        // if we reuse the same key, with the same key finder, this event goes to the current node
+        outputEvent.put("key", int.class, event.get("key", int.class));
+        outputEvent.put("value", String.class, "forwarded - " + value);
+        downstream.put(outputEvent);
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void setDownstream(Stream<Event> downstream) {
+        this.downstream = downstream;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/LastPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/LastPE.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/LastPE.java
new file mode 100644
index 0000000..c04df7e
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/LastPE.java
@@ -0,0 +1,30 @@
+package org.apache.s4.benchmark.dag;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LastPE extends ProcessingElement {
+
+    private static Logger logger = LoggerFactory.getLogger(LastPE.class);
+
+    public void onEvent(Event event) {
+
+        String value = event.get("value", String.class);
+        logger.trace("LastPE : {} -> {}", getId(), value);
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/PipePE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/PipePE.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/PipePE.java
new file mode 100644
index 0000000..96587fc
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/PipePE.java
@@ -0,0 +1,42 @@
+package org.apache.s4.benchmark.dag;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PipePE extends ProcessingElement {
+
+    private static Logger logger = LoggerFactory.getLogger(PipePE.class);
+
+    private Stream<Event> downstream;
+
+    public void onEvent(Event event) {
+
+        String value = event.get("value", String.class);
+        logger.trace("PipePE : {} -> {}", getId(), value);
+        Event outputEvent = new Event();
+        // if we reuse the same key, with the same key finder, this event goes to the current node
+        outputEvent.put("key", int.class, event.get("key", int.class));
+        outputEvent.put("value", String.class, value + "->" + getId());
+        downstream.put(outputEvent);
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void setDownstream(Stream<Event> downstream) {
+        this.downstream = downstream;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/ProducerConsumerApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/ProducerConsumerApp.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/ProducerConsumerApp.java
new file mode 100644
index 0000000..6465cf5
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/ProducerConsumerApp.java
@@ -0,0 +1,79 @@
+package org.apache.s4.benchmark.prodcon;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.App;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import com.yammer.metrics.reporting.ConsoleReporter;
+import com.yammer.metrics.reporting.CsvReporter;
+
+public class ProducerConsumerApp extends App {
+
+    @Inject
+    @Named("s4.cluster.zk_address")
+    String zkString;
+
+    @Override
+    protected void onStart() {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    protected void onInit() {
+        File logDirectory = new File(System.getProperty("user.dir") + "/measurements/node"
+                + getReceiver().getPartitionId());
+        if (!logDirectory.exists()) {
+            if (!logDirectory.mkdirs()) {
+                throw new RuntimeException("Cannot create log dir " + logDirectory.getAbsolutePath());
+            }
+        }
+        CsvReporter.enable(logDirectory, 10, TimeUnit.SECONDS);
+        ConsoleReporter.enable(10, TimeUnit.SECONDS);
+
+        SimplePE1 simplePE1 = createPE(SimplePE1.class, "simplePE1");
+        ZkClient zkClient = new ZkClient(zkString);
+        zkClient.waitUntilExists("/benchmarkConfig/warmupIterations", TimeUnit.SECONDS, 60);
+
+        // TODO fix hardcoded cluster name (pass injector config?)
+        int nbInjectors = zkClient.countChildren("/s4/clusters/testCluster1/tasks");
+        simplePE1.setNbInjectors(nbInjectors);
+
+        createInputStream("inputStream", new KeyFinder<Event>() {
+
+            @Override
+            public List<String> get(Event event) {
+                return ImmutableList.of(event.get("key"));
+            }
+        }, simplePE1).setParallelism(1);
+
+        SimplePE2 simplePE2 = createPE(SimplePE2.class, "simplePE2");
+
+        createInputStream("inputStream2", new KeyFinder<Event>() {
+
+            @Override
+            public List<String> get(Event event) {
+                return ImmutableList.of(event.get("key"));
+            }
+        }, simplePE2);
+
+    }
+
+    @Override
+    protected void onClose() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public String getZkString() {
+        return zkString;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/SimplePE1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/SimplePE1.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/SimplePE1.java
new file mode 100644
index 0000000..8e5a5fc
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/SimplePE1.java
@@ -0,0 +1,36 @@
+package org.apache.s4.benchmark.prodcon;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimplePE1 extends ProcessingElement {
+
+    private static Logger logger = LoggerFactory.getLogger(SimplePE1.class);
+
+    int nbInjectors;
+
+    public void setNbInjectors(int nbInjectors) {
+        this.nbInjectors = nbInjectors;
+    }
+
+    public void onEvent(Event event) {
+
+        Long value = event.get("value", long.class);
+        logger.trace(String.valueOf(value));
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/SimplePE2.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/SimplePE2.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/SimplePE2.java
new file mode 100644
index 0000000..678710a
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/SimplePE2.java
@@ -0,0 +1,30 @@
+package org.apache.s4.benchmark.prodcon;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimplePE2 extends ProcessingElement {
+
+    private static Logger logger = LoggerFactory.getLogger(SimplePE2.class);
+
+    public void onEvent(Event event) {
+
+        Long value = event.get("value", long.class);
+        logger.trace(String.valueOf(value));
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
deleted file mode 100644
index 5463703..0000000
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
+++ /dev/null
@@ -1,176 +0,0 @@
-package org.apache.s4.benchmark.simpleApp1;
-
-import java.io.File;
-import java.math.BigDecimal;
-import java.math.MathContext;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.base.KeyFinder;
-import org.apache.s4.core.RemoteStream;
-import org.apache.s4.core.adapter.AdapterApp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.reporting.ConsoleReporter;
-
-public class Injector extends AdapterApp {
-
-    private static Logger logger = LoggerFactory.getLogger(Injector.class);
-
-    @Inject
-    @Named("s4.benchmark.warmupIterations")
-    long warmupIterations;
-
-    @Inject
-    @Named("s4.benchmark.testIterations")
-    long testIterations;
-
-    @Inject
-    @Named("s4.benchmark.keysCount")
-    int keysCount;
-
-    @Inject
-    @Named("s4.benchmark.warmupSleepInterval")
-    int warmupSleepInterval;
-
-    @Inject
-    @Named("s4.benchmark.testSleepInterval")
-    int testSleepInterval;
-
-    @Inject
-    @Named("s4.cluster.zk_address")
-    String zkString;
-
-    @Inject
-    @Named("s4.benchmark.injector.parallelism")
-    int parallelism;
-
-    // Meter meter = Metrics.newMeter(Injector.class, "injector", "injected", TimeUnit.SECONDS);
-
-    static AtomicLong counter = new AtomicLong();
-    static AtomicLong eventCountPerInterval = new AtomicLong();
-    BigDecimal rate;
-    volatile long lastTime = -1;
-
-    @Override
-    protected void onInit() {
-
-        File logDir = new File(System.getProperty("user.dir") + "/measurements/injectors");
-        if (!logDir.mkdirs()) {
-            logger.debug("Cannot create dir " + logDir.getAbsolutePath());
-        }
-        // CsvReporter.enable(logDir, 5, TimeUnit.SECONDS);
-        remoteStreamKeyFinder = new KeyFinder<Event>() {
-
-            @Override
-            public List<String> get(Event event) {
-                return ImmutableList.of(event.get("key"));
-            }
-        };
-        super.onInit();
-        ConsoleReporter.enable(30, TimeUnit.SECONDS);
-    }
-
-    @Override
-    protected void onStart() {
-
-        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
-
-            @Override
-            public void run() {
-                if (lastTime == -1) {
-                    lastTime = System.currentTimeMillis();
-                } else {
-                    if ((System.currentTimeMillis() - lastTime) > 1000) {
-                        rate = new BigDecimal(eventCountPerInterval.getAndSet(0)).divide(
-                                new BigDecimal(System.currentTimeMillis() - lastTime), MathContext.DECIMAL64).multiply(
-                                new BigDecimal(1000), MathContext.DECIMAL64);
-                        lastTime = System.currentTimeMillis();
-                    }
-                }
-
-            }
-        }, 1, 1, TimeUnit.SECONDS);
-
-        Metrics.newGauge(Injector.class, "injection-rate", new Gauge<BigDecimal>() {
-
-            @Override
-            public BigDecimal value() {
-                return rate;
-            }
-        });
-
-        RemoteStream remoteStream = getRemoteStream();
-
-        counter.set(0);
-
-        generateEvents(remoteStream, testIterations, keysCount, testSleepInterval, parallelism);
-
-        logger.info("Tests completed after test events", testIterations * parallelism * keysCount);
-
-        System.exit(0);
-    }
-
-    private void generateEvents(RemoteStream remoteStream, long iterations, int keysCount, int sleepInterval,
-            int parallelism) {
-
-        ExecutorService threadPool = Executors.newFixedThreadPool(parallelism);
-        for (int i = 0; i < parallelism; i++) {
-            threadPool.submit(new InjectionTask(iterations, remoteStream, sleepInterval));
-        }
-
-        threadPool.shutdown();
-        try {
-            threadPool.awaitTermination(10, TimeUnit.MINUTES);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-
-    class InjectionTask implements Runnable {
-
-        private long iterations;
-        private RemoteStream remoteStream;
-        private long sleepInterval;
-
-        public InjectionTask(long iterations, RemoteStream remoteStream, long sleepInterval) {
-            super();
-            this.iterations = iterations;
-            this.remoteStream = remoteStream;
-            this.sleepInterval = sleepInterval;
-
-        }
-
-        @Override
-        public void run() {
-            for (long i = 0; i < iterations; i++) {
-                for (int j = 0; j < keysCount; j++) {
-                    Event event = new Event();
-                    event.put("key", int.class, j);
-                    event.put("value", long.class, counter.incrementAndGet());
-                    event.put("injector", Integer.class, getReceiver().getPartitionId());
-                    // logger.info("{}/{}/{}/",
-                    // new String[] { Thread.currentThread().getName(), String.valueOf(i), String.valueOf(j),
-                    // String.valueOf(event.get("value")) });
-                    remoteStream.put(event);
-                    eventCountPerInterval.incrementAndGet();
-                    try {
-                        Thread.sleep(sleepInterval);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
deleted file mode 100644
index c2fde2b..0000000
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.s4.benchmark.simpleApp1;
-
-import java.io.File;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.base.KeyFinder;
-import org.apache.s4.comm.topology.ZkClient;
-import org.apache.s4.core.App;
-
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-import com.yammer.metrics.reporting.ConsoleReporter;
-import com.yammer.metrics.reporting.CsvReporter;
-
-public class SimpleApp extends App {
-
-    @Inject
-    @Named("s4.cluster.zk_address")
-    String zkString;
-
-    @Override
-    protected void onStart() {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    protected void onInit() {
-        File logDirectory = new File(System.getProperty("user.dir") + "/measurements/node"
-                + getReceiver().getPartitionId());
-        if (!logDirectory.exists()) {
-            if (!logDirectory.mkdirs()) {
-                throw new RuntimeException("Cannot create log dir " + logDirectory.getAbsolutePath());
-            }
-        }
-        CsvReporter.enable(logDirectory, 10, TimeUnit.SECONDS);
-        ConsoleReporter.enable(10, TimeUnit.SECONDS);
-
-        SimplePE1 simplePE1 = createPE(SimplePE1.class, "simplePE1");
-        ZkClient zkClient = new ZkClient(zkString);
-        zkClient.waitUntilExists("/benchmarkConfig/warmupIterations", TimeUnit.SECONDS, 60);
-
-        // TODO fix hardcoded cluster name (pass injector config?)
-        int nbInjectors = zkClient.countChildren("/s4/clusters/testCluster1/tasks");
-        simplePE1.setNbInjectors(nbInjectors);
-
-        createInputStream("inputStream", new KeyFinder<Event>() {
-
-            @Override
-            public List<String> get(Event event) {
-                return ImmutableList.of(event.get("key"));
-            }
-        }, simplePE1).setParallelism(1);
-
-        SimplePE2 simplePE2 = createPE(SimplePE2.class, "simplePE2");
-
-        createInputStream("inputStream2", new KeyFinder<Event>() {
-
-            @Override
-            public List<String> get(Event event) {
-                return ImmutableList.of(event.get("key"));
-            }
-        }, simplePE2);
-
-    }
-
-    @Override
-    protected void onClose() {
-        // TODO Auto-generated method stub
-
-    }
-
-    public String getZkString() {
-        return zkString;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
deleted file mode 100644
index 49f0868..0000000
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.s4.benchmark.simpleApp1;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.ProcessingElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SimplePE1 extends ProcessingElement {
-
-    private static Logger logger = LoggerFactory.getLogger(SimplePE1.class);
-
-    int nbInjectors;
-
-    public void setNbInjectors(int nbInjectors) {
-        this.nbInjectors = nbInjectors;
-    }
-
-    public void onEvent(Event event) {
-
-        Long value = event.get("value", long.class);
-        logger.trace(String.valueOf(value));
-    }
-
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onCreate() {
-        // TODO Auto-generated method stub
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE2.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE2.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE2.java
deleted file mode 100644
index 96bff6e..0000000
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE2.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.s4.benchmark.simpleApp1;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.ProcessingElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SimplePE2 extends ProcessingElement {
-
-    private static Logger logger = LoggerFactory.getLogger(SimplePE2.class);
-
-    public void onEvent(Event event) {
-
-        Long value = event.get("value", long.class);
-        logger.trace(String.valueOf(value));
-    }
-
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onCreate() {
-        // TODO Auto-generated method stub
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Injector.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Injector.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Injector.java
new file mode 100644
index 0000000..769ccaa
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Injector.java
@@ -0,0 +1,174 @@
+package org.apache.s4.benchmark.utils;
+
+import java.io.File;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.RemoteStream;
+import org.apache.s4.core.adapter.AdapterApp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.reporting.ConsoleReporter;
+
+public class Injector extends AdapterApp {
+
+    private static Logger logger = LoggerFactory.getLogger(Injector.class);
+
+    @Inject
+    @Named("s4.benchmark.testIterations")
+    long testIterations;
+
+    @Inject
+    @Named("s4.benchmark.keysCount")
+    int keysCount;
+
+    @Inject
+    @Named("s4.benchmark.pauseTimeMs")
+    int pauseTimeMs;
+
+    @Inject
+    @Named("s4.benchmark.injector.iterationsBeforePause")
+    int iterationsBeforePause;
+
+    @Inject
+    @Named("s4.cluster.zk_address")
+    String zkString;
+
+    @Inject
+    @Named("s4.benchmark.injector.parallelism")
+    int parallelism;
+
+    // Meter meter = Metrics.newMeter(Injector.class, "injector", "injected", TimeUnit.SECONDS);
+
+    static AtomicLong counter = new AtomicLong();
+    static AtomicLong eventCountPerInterval = new AtomicLong();
+    BigDecimal rate;
+    volatile long lastTime = -1;
+
+    @Override
+    protected void onInit() {
+
+        File logDir = new File(System.getProperty("user.dir") + "/measurements/injectors");
+        if (!logDir.mkdirs()) {
+            logger.debug("Cannot create dir " + logDir.getAbsolutePath());
+        }
+        // CsvReporter.enable(logDir, 5, TimeUnit.SECONDS);
+        remoteStreamKeyFinder = new KeyFinder<Event>() {
+
+            @Override
+            public List<String> get(Event event) {
+                return ImmutableList.of(event.get("key"));
+            }
+        };
+        super.onInit();
+        ConsoleReporter.enable(30, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected void onStart() {
+
+        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
+
+            @Override
+            public void run() {
+                if (lastTime == -1) {
+                    lastTime = System.currentTimeMillis();
+                } else {
+                    if ((System.currentTimeMillis() - lastTime) > 1000) {
+                        rate = new BigDecimal(eventCountPerInterval.getAndSet(0)).divide(
+                                new BigDecimal(System.currentTimeMillis() - lastTime), MathContext.DECIMAL64).multiply(
+                                new BigDecimal(1000), MathContext.DECIMAL64);
+                        lastTime = System.currentTimeMillis();
+                    }
+                }
+
+            }
+        }, 1, 1, TimeUnit.SECONDS);
+
+        Metrics.newGauge(Injector.class, "injection-rate", new Gauge<BigDecimal>() {
+
+            @Override
+            public BigDecimal value() {
+                return rate;
+            }
+        });
+
+        RemoteStream remoteStream = getRemoteStream();
+
+        counter.set(0);
+
+        generateEvents(remoteStream, testIterations, keysCount, parallelism);
+
+        logger.info("Tests completed after test events", testIterations * parallelism * keysCount);
+
+        System.exit(0);
+    }
+
+    private void generateEvents(RemoteStream remoteStream, long iterations, int keysCount, int parallelism) {
+
+        ExecutorService threadPool = Executors.newFixedThreadPool(parallelism);
+        for (int i = 0; i < parallelism; i++) {
+            threadPool.submit(new InjectionTask(iterations, remoteStream));
+        }
+
+        threadPool.shutdown();
+        try {
+            threadPool.awaitTermination(10, TimeUnit.MINUTES);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    class InjectionTask implements Runnable {
+
+        private final long iterations;
+        private final RemoteStream remoteStream;
+
+        public InjectionTask(long iterations, RemoteStream remoteStream) {
+            super();
+            this.iterations = iterations;
+            this.remoteStream = remoteStream;
+
+        }
+
+        @Override
+        public void run() {
+            for (long i = 0; i < iterations; i++) {
+                for (int j = 0; j < keysCount; j++) {
+                    long currentCount = counter.incrementAndGet();
+                    Event event = new Event();
+                    event.put("key", int.class, j);
+                    event.put("value", long.class, currentCount);
+                    event.put("injector", Integer.class, getReceiver().getPartitionId());
+                    // logger.info("{}/{}/{}/",
+                    // new String[] { Thread.currentThread().getName(), String.valueOf(i), String.valueOf(j),
+                    // String.valueOf(event.get("value")) });
+                    remoteStream.put(event);
+                    eventCountPerInterval.incrementAndGet();
+
+                    try {
+                        if ((currentCount % iterationsBeforePause) == 0) {
+                            Thread.sleep(pauseTimeMs);
+                        }
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index e444e1e..a0612cf 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -31,6 +31,7 @@ import org.apache.s4.base.RemoteEmitter;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.staging.MemoryAwareDeserializerExecutorFactory;
 import org.apache.s4.comm.tcp.RemoteEmitters;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromZK;
@@ -101,7 +102,7 @@ public class DefaultCommModule extends AbstractModule {
 
         bind(RemoteEmitters.class);
 
-        bind(DeserializerExecutorFactory.class).to(DefaultDeserializerExecutorFactory.class);
+        bind(DeserializerExecutorFactory.class).to(MemoryAwareDeserializerExecutorFactory.class);
 
         try {
             Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultDeserializerExecutorFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultDeserializerExecutorFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultDeserializerExecutorFactory.java
deleted file mode 100644
index bd359db..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultDeserializerExecutorFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.s4.comm;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-
-import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/**
- * Default executor factory implementation for deserialization stage.
- * 
- * 
- * 
- */
-public class DefaultDeserializerExecutorFactory implements DeserializerExecutorFactory {
-
-    @Named("s4.listener.maxMemoryPerChannel")
-    @Inject(optional = true)
-    int maxMemoryPerChannel = 100000;
-
-    @Named("s4.listener.maxMemoryPerExecutor")
-    @Inject(optional = true)
-    int maxMemoryPerExecutor = 100000;
-
-    @Override
-    public Executor create() {
-        // NOTE: these are suggested defaults but they might require application-specific tuning
-        return new OrderedMemoryAwareThreadPoolExecutor(1, maxMemoryPerChannel, maxMemoryPerExecutor, 60,
-                TimeUnit.SECONDS, new ThreadFactoryBuilder().setNameFormat("listener-deserializer-%d").build());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
index 701ed22..b1cee4d 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
@@ -4,6 +4,16 @@ import java.util.concurrent.Executor;
 
 /**
  * Factory for deserializer executors used in listener pipelines.
+ * <p>
+ * Deserialization is a relatively costly operation, depending on the event type. This operation can be parallelized,
+ * and we provide channel workers an executor for that purpose.
+ * <p>
+ * There are many possible implementations, that may consider various factors, in particular:
+ * <ul>
+ * <li>parallelism
+ * <li>memory usage
+ * <li>sharing threadpool among channel workers
+ * 
  * 
  */
 public interface DeserializerExecutorFactory {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ThrottlingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ThrottlingThreadPoolExecutorService.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ThrottlingThreadPoolExecutorService.java
deleted file mode 100644
index 45659ac..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ThrottlingThreadPoolExecutorService.java
+++ /dev/null
@@ -1,185 +0,0 @@
-package org.apache.s4.comm;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.ForwardingListeningExecutorService;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * This thread pool executor throttles the submission of new tasks by using a semaphore. Task submissions require
- * permits, task completions release permits.
- * 
- * NOTE: you should either use the {@link ThrottlingThreadPoolExecutorService#submit(java.util.concurrent.Callable)}
- * methods or the {@link ThrottlingThreadPoolExecutorService#execute(Runnable)} method.
- * 
- */
-public class ThrottlingThreadPoolExecutorService extends ForwardingListeningExecutorService {
-
-    private static Logger logger = LoggerFactory.getLogger(ThrottlingThreadPoolExecutorService.class);
-
-    int parallelism;
-    String streamName;
-    final ClassLoader classLoader;
-    int workQueueSize;
-    private BlockingQueue<Runnable> workQueue;
-    private Semaphore queueingPermits;
-    private ListeningExecutorService executorDelegatee;
-
-    /**
-     * 
-     * @param parallelism
-     *            Maximum number of threads in the pool
-     * @param fairParallelism
-     *            If true, in case of contention, waiting threads will be scheduled in a first-in first-out manner. This
-     *            can be help ensure ordering, though there is an associated performance cost (typically small).
-     * @param threadName
-     *            Naming scheme
-     * @param workQueueSize
-     *            Queue capacity
-     * @param classLoader
-     *            ClassLoader used as contextClassLoader for spawned threads
-     */
-    public ThrottlingThreadPoolExecutorService(int parallelism, boolean fairParallelism, String threadName,
-            int workQueueSize, final ClassLoader classLoader) {
-        super();
-        this.parallelism = parallelism;
-        this.streamName = threadName;
-        this.classLoader = classLoader;
-        this.workQueueSize = workQueueSize;
-        queueingPermits = new Semaphore(workQueueSize + parallelism, false);
-        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName)
-                .setThreadFactory(new ThreadFactory() {
-
-                    @Override
-                    public Thread newThread(Runnable r) {
-                        Thread t = new Thread(r);
-                        t.setContextClassLoader(classLoader);
-                        return t;
-                    }
-                }).build();
-        // queueingPermits semaphore controls the size of the queue, thus no need to use a bounded queue
-        workQueue = new LinkedBlockingQueue<Runnable>(workQueueSize + parallelism);
-        ThreadPoolExecutor eventProcessingExecutor = new ThreadPoolExecutor(parallelism, parallelism, 60,
-                TimeUnit.SECONDS, workQueue, threadFactory, new RejectedExecutionHandler() {
-
-                    @Override
-                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-                        // This is not expected to happen.
-                        logger.error("Could not submit task to executor {}", executor.toString());
-                    }
-                });
-        ((ThreadPoolExecutor) eventProcessingExecutor).allowCoreThreadTimeOut(true);
-        executorDelegatee = MoreExecutors.listeningDecorator(eventProcessingExecutor);
-
-    }
-
-    @Override
-    protected ListeningExecutorService delegate() {
-        return executorDelegatee;
-    }
-
-    @Override
-    public <T> ListenableFuture<T> submit(Callable<T> task) {
-        try {
-            queueingPermits.acquire();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-            Thread.currentThread().interrupt();
-        }
-        ListenableFuture<T> future = super.submit(new CallableWithPermitRelease<T>(task));
-        return future;
-    }
-
-    @Override
-    public <T> ListenableFuture<T> submit(Runnable task, T result) {
-        try {
-            queueingPermits.acquire();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-        ListenableFuture<T> future = super.submit(new RunnableWithPermitRelease(task), result);
-        return future;
-    }
-
-    @Override
-    public ListenableFuture<?> submit(Runnable task) {
-        try {
-            queueingPermits.acquire();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-        ListenableFuture<?> future = super.submit(new RunnableWithPermitRelease(task));
-        return future;
-    }
-
-    @Override
-    public void execute(Runnable command) {
-        try {
-            queueingPermits.acquire();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-            Thread.currentThread().interrupt();
-        }
-        super.execute(new RunnableWithPermitRelease(command));
-    }
-
-    /**
-     * Releases a permit after the task is executed
-     * 
-     */
-    class RunnableWithPermitRelease implements Runnable {
-
-        Runnable delegatee;
-
-        public RunnableWithPermitRelease(Runnable delegatee) {
-            this.delegatee = delegatee;
-        }
-
-        @Override
-        public void run() {
-            try {
-                delegatee.run();
-            } finally {
-                queueingPermits.release();
-            }
-
-        }
-    }
-
-    /**
-     * Releases a permit after the task is completed
-     * 
-     */
-    class CallableWithPermitRelease<T> implements Callable<T> {
-
-        Callable<T> delegatee;
-
-        public CallableWithPermitRelease(Callable<T> delegatee) {
-            this.delegatee = delegatee;
-        }
-
-        @Override
-        public T call() throws Exception {
-            try {
-                return delegatee.call();
-            } finally {
-                queueingPermits.release();
-            }
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/MemoryAwareDeserializerExecutorFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/MemoryAwareDeserializerExecutorFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/MemoryAwareDeserializerExecutorFactory.java
new file mode 100644
index 0000000..39f6d70
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/MemoryAwareDeserializerExecutorFactory.java
@@ -0,0 +1,44 @@
+package org.apache.s4.comm.staging;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.DeserializerExecutorFactory;
+import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Executors factory for the deserialization stage that blocks incoming tasks when memory consumed by events reaches a
+ * given threshold.
+ * <p>
+ * It uses a memory-aware threadpool single threaded executor. There is 1 threadpool for each channel.
+ * <p>
+ * The memory limit for the threadpool can be configured per channel and per executor (with a single threaded executor,
+ * the lowest of those value is the one that mandates)
+ * 
+ * 
+ * 
+ */
+public class MemoryAwareDeserializerExecutorFactory implements DeserializerExecutorFactory {
+
+    @Named("s4.listener.maxMemoryPerChannel")
+    @Inject(optional = true)
+    protected int maxMemoryPerChannel = 1000000;
+
+    @Named("s4.listener.maxMemoryPerExecutor")
+    @Inject(optional = true)
+    protected int maxMemoryPerExecutor = 1000000;
+
+    @Override
+    public Executor create() {
+        LoggerFactory.getLogger(getClass()).info(
+                "Creating an OMATPE with maxmemperchannel= {} and maxmemperexecutor= {}", maxMemoryPerChannel,
+                maxMemoryPerExecutor);
+        return new OrderedMemoryAwareThreadPoolExecutor(1, maxMemoryPerChannel, maxMemoryPerExecutor, 60,
+                TimeUnit.SECONDS, new ThreadFactoryBuilder().setNameFormat("listener-deserializer-%d").build());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/SharedThrottlingDeserializerExecutorFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/SharedThrottlingDeserializerExecutorFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/SharedThrottlingDeserializerExecutorFactory.java
new file mode 100644
index 0000000..48ff4f9
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/SharedThrottlingDeserializerExecutorFactory.java
@@ -0,0 +1,36 @@
+package org.apache.s4.comm.staging;
+
+import java.util.concurrent.Executor;
+
+import org.apache.s4.comm.DeserializerExecutorFactory;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A factory for deserializer executors that returns a unique thread pool, shared among channels. This can be useful
+ * when there are many inbound channels and that you need to limit the number of threads in the node.
+ * 
+ */
+public class SharedThrottlingDeserializerExecutorFactory implements DeserializerExecutorFactory {
+
+    private static final int QUEUE_CAPACITY = 100000;
+
+    enum ThrottlingExecutorSingleton {
+        INSTANCE;
+
+        ThrottlingThreadPoolExecutorService executor;
+
+        private ThrottlingExecutorSingleton() {
+            this.executor = new ThrottlingThreadPoolExecutorService(Runtime.getRuntime().availableProcessors(), true,
+                    "listener-deserializer-%d", QUEUE_CAPACITY, Thread.currentThread().getContextClassLoader());
+        }
+    }
+
+    @Override
+    public Executor create() {
+        LoggerFactory
+                .getLogger(getClass())
+                .info("Creating a shared (i.e. singleton, shared across netty channel workers) throttling thread pool with queue capacity of {}",
+                        QUEUE_CAPACITY);
+        return ThrottlingExecutorSingleton.INSTANCE.executor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
new file mode 100644
index 0000000..a805f21
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
@@ -0,0 +1,212 @@
+package org.apache.s4.comm.staging;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This thread pool executor throttles the submission of new tasks by using a semaphore. Task submissions require
+ * permits, task completions release permits.
+ * <p>
+ * NOTE: you should either use the {@link ThrottlingThreadPoolExecutorService#submit(java.util.concurrent.Callable)}
+ * methods or the {@link ThrottlingThreadPoolExecutorService#execute(Runnable)} method.
+ * 
+ */
+public class ThrottlingThreadPoolExecutorService extends ForwardingListeningExecutorService {
+
+    private static Logger logger = LoggerFactory.getLogger(ThrottlingThreadPoolExecutorService.class);
+
+    int parallelism;
+    String streamName;
+    final ClassLoader classLoader;
+    int workQueueSize;
+    private BlockingQueue<Runnable> workQueue;
+    private Semaphore queueingPermits;
+    private ListeningExecutorService executorDelegatee;
+
+    /**
+     * 
+     * @param parallelism
+     *            Maximum number of threads in the pool
+     * @param fairParallelism
+     *            If true, in case of contention, waiting threads will be scheduled in a first-in first-out manner. This
+     *            can be help ensure ordering, though there is an associated performance cost (typically small).
+     * @param threadName
+     *            Naming scheme
+     * @param workQueueSize
+     *            Queue capacity
+     * @param classLoader
+     *            ClassLoader used as contextClassLoader for spawned threads
+     */
+    public ThrottlingThreadPoolExecutorService(int parallelism, boolean fairParallelism, String threadName,
+            int workQueueSize, final ClassLoader classLoader) {
+        super();
+        this.parallelism = parallelism;
+        this.streamName = threadName;
+        this.classLoader = classLoader;
+        this.workQueueSize = workQueueSize;
+        queueingPermits = new Semaphore(workQueueSize + parallelism, false);
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName)
+                .setThreadFactory(new ThreadFactory() {
+
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        Thread t = new Thread(r);
+                        t.setContextClassLoader(classLoader);
+                        return t;
+                    }
+                }).build();
+        // queueingPermits semaphore controls the size of the queue, thus no need to use a bounded queue
+        workQueue = new LinkedBlockingQueue<Runnable>(workQueueSize + parallelism);
+        ThreadPoolExecutor eventProcessingExecutor = new ThreadPoolExecutor(parallelism, parallelism, 60,
+                TimeUnit.SECONDS, workQueue, threadFactory, new RejectedExecutionHandler() {
+
+                    @Override
+                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                        // This is not expected to happen.
+                        logger.error("Could not submit task to executor {}", executor.toString());
+                    }
+                });
+        eventProcessingExecutor.allowCoreThreadTimeOut(true);
+        executorDelegatee = MoreExecutors.listeningDecorator(eventProcessingExecutor);
+
+    }
+
+    @Override
+    protected ListeningExecutorService delegate() {
+        return executorDelegatee;
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Callable<T> task) {
+        try {
+            queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            Thread.currentThread().interrupt();
+        }
+        ListenableFuture<T> future = super.submit(new CallableWithPermitRelease<T>(task));
+        return future;
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Runnable task, T result) {
+        try {
+            queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        ListenableFuture<T> future = super.submit(new RunnableWithPermitRelease(task), result);
+        return future;
+    }
+
+    @Override
+    public ListenableFuture<?> submit(Runnable task) {
+        try {
+            queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        ListenableFuture<?> future = super.submit(new RunnableWithPermitRelease(task));
+        return future;
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        try {
+            queueingPermits.acquire();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            Thread.currentThread().interrupt();
+        }
+        super.execute(new RunnableWithPermitRelease(command));
+    }
+
+    /**
+     * Releases a permit after the task is executed
+     * 
+     */
+    class RunnableWithPermitRelease implements Runnable {
+
+        Runnable delegatee;
+
+        public RunnableWithPermitRelease(Runnable delegatee) {
+            this.delegatee = delegatee;
+        }
+
+        @Override
+        public void run() {
+            try {
+                delegatee.run();
+            } finally {
+                queueingPermits.release();
+            }
+
+        }
+    }
+
+    /**
+     * Releases a permit after the task is completed
+     * 
+     */
+    class CallableWithPermitRelease<T> implements Callable<T> {
+
+        Callable<T> delegatee;
+
+        public CallableWithPermitRelease(Callable<T> delegatee) {
+            this.delegatee = delegatee;
+        }
+
+        @Override
+        public T call() throws Exception {
+            try {
+                return delegatee.call();
+            } finally {
+                queueingPermits.release();
+            }
+        }
+
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        throw new RuntimeException("Not implemented");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index c15f355..0bcce32 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -22,7 +22,9 @@ import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
+import java.util.Map;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -55,11 +57,16 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
 /**
  * TCPEmitter - Uses TCP to send messages across partitions.
+ * <p>
+ * It maintains a mapping of partition to channel, updated upon cluster updates.
+ * <p>
+ * A throttling mechanism is also provided, so that back pressure can be applied if consumers are too slow.
  * 
  */
 
@@ -92,13 +99,29 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     @Inject
     SerializerDeserializerFactory serDeserFactory;
     SerializerDeserializer serDeser;
+    Map<Integer, Semaphore> writePermits = Maps.newHashMap();
 
     EmitterMetrics metrics;
 
+    final private int maxPendingWrites;
+
+    /**
+     * 
+     * @param topology
+     *            the target cluster configuration
+     * @param timeout
+     *            netty timeout
+     * @param maxPendingWrites
+     *            maximum number of events not yet flushed to the TCP buffer
+     * @throws InterruptedException
+     *             in case of an interruption
+     */
     @Inject
-    public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout) throws InterruptedException {
+    public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout,
+            @Named("s4.emitter.maxPendingWrites") int maxPendingWrites) throws InterruptedException {
         this.nettyTimeout = timeout;
         this.topology = topology;
+        this.maxPendingWrites = maxPendingWrites;
         this.lock = new ReentrantLock();
 
         // Initialize data structures
@@ -174,16 +197,26 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
             }
         }
 
+        try {
+            writePermits.get(partitionId).acquire();
+        } catch (InterruptedException e) {
+            logger.error("Interrupted while acquiring permit", e);
+            Thread.currentThread().interrupt();
+        }
+
         Channel c = partitionChannelMap.get(partitionId);
         if (c == null) {
             logger.warn("Could not find channel for partition {}", partitionId);
             return;
         }
+
         c.write(buffer).addListener(new MessageSendingListener(partitionId));
     }
 
     @Override
     public boolean send(int partitionId, ByteBuffer message) {
+        // TODO a possible optimization would be to buffer messages per partition, with a small timeout. This will limit
+        // the number of writes and therefore system calls.
         sendMessage(partitionId, message);
         return true;
     }
@@ -235,6 +268,9 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
                     removeChannel(partition);
                 }
                 partitionNodeMap.forcePut(partition, clusterNode);
+                if (!writePermits.containsKey(partition)) {
+                    writePermits.put(partition, new Semaphore(maxPendingWrites));
+                }
             }
         } finally {
             lock.unlock();
@@ -273,6 +309,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
+            writePermits.get(partitionId).release();
             if (!future.isSuccess()) {
                 try {
                     // TODO handle possible cluster reconfiguration between send and failure callback

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index 35b1a52..c7673ae 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -62,24 +62,35 @@ public class TCPListener implements Listener {
     private final ChannelGroup channels = new DefaultChannelGroup();
     private final int nettyTimeout;
 
+    /**
+     * 
+     * @param assignment
+     *            partition assignment information
+     * @param timeout
+     *            netty timeout
+     * @param receiver
+     *            link to the application layer
+     * @param deserializerExecutorFactory
+     *            factory for creating deserialization thread pool
+     */
     @Inject
     public TCPListener(Assignment assignment, @Named("s4.comm.timeout") int timeout, final Receiver receiver,
             final DeserializerExecutorFactory deserializerExecutorFactory) {
         // wait for an assignment
         node = assignment.assignClusterNode();
         nettyTimeout = timeout;
-
         ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
                 Executors.newCachedThreadPool());
 
         bootstrap = new ServerBootstrap(factory);
 
         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+            @Override
             public ChannelPipeline getPipeline() {
                 ChannelPipeline p = Channels.pipeline();
                 p.addLast("decoder", new LengthFieldBasedFrameDecoder(999999, 0, 4, 0, 4));
                 p.addLast("executionhandler", new ExecutionHandler(deserializerExecutorFactory.create()));
-                p.addLast("receiver", new ChannelHandler(receiver));
+                p.addLast("receiver", new EventDecoderHandler(receiver));
 
                 return p;
             }
@@ -100,6 +111,7 @@ public class TCPListener implements Listener {
         return node.getPartition();
     }
 
+    @Override
     public void close() {
         try {
             channels.close().await();
@@ -109,19 +121,21 @@ public class TCPListener implements Listener {
         bootstrap.releaseExternalResources();
     }
 
-    public class ChannelHandler extends SimpleChannelHandler {
+    public class EventDecoderHandler extends SimpleChannelHandler {
         private final Receiver receiver;
 
-        public ChannelHandler(Receiver receiver) {
+        public EventDecoderHandler(Receiver receiver) {
             this.receiver = receiver;
         }
 
+        @Override
         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
             channels.add(e.getChannel());
             ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
             receiver.receive(buffer.toByteBuffer());
         }
 
+        @Override
         public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
             logger.error("Error", event.getCause());
             Channel c = context.getChannel();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
index a22767b..c7d1bef 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
@@ -33,12 +33,14 @@ public class TCPRemoteEmitter extends TCPEmitter implements RemoteEmitter {
 
     /**
      * Sends to remote subclusters. This is dynamically created, through an injected factory, when new subclusters are
-     * discovered (as remote streams outputs)
+     * discovered (as remote streams outputs).
+     * <p>
+     * See {@link TCPEmitter} for more information about the implementation.
      */
     @Inject
-    public TCPRemoteEmitter(@Assisted Cluster topology, @Named("s4.comm.timeout") int timeout)
-            throws InterruptedException {
-        super(topology, timeout);
+    public TCPRemoteEmitter(@Assisted Cluster topology, @Named("s4.comm.timeout") int timeout,
+            @Named("s4.emitter.maxPendingWrites") int maxPendingWrites) throws InterruptedException {
+        super(topology, timeout, maxPendingWrites);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
index 33d75e7..11db4eb 100644
--- a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
+++ b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
@@ -1,6 +1,7 @@
 s4.comm.emitter.class=org.apache.s4.comm.tcp.TCPEmitter
 s4.comm.emitter.remote.class=org.apache.s4.comm.tcp.TCPRemoteEmitter
 s4.comm.listener.class=org.apache.s4.comm.tcp.TCPListener
+
 # I/O channel connection timeout, when applicable (e.g. used by netty)
 s4.comm.timeout=1000
 s4.cluster.zk_address = localhost:2181
@@ -11,5 +12,8 @@ s4.cluster.zk_connection_timeout = 10000
 s4.sender.parallelism=1
 # maximum number of events in the buffer of the sender stage
 s4.sender.workQueueSize=10000
+# maximum number of pending writes to a given comm channel
+s4.emitter.maxPendingWrites=1000
+
 # maximum number of events in the buffer of the processing stage
 s4.stream.workQueueSize=10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index 79a356c..7dd84b9 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -95,6 +95,7 @@ public class SenderImpl implements Sender {
     public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
         int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
         if (partition == localPartitionId) {
+            metrics.sentLocal();
             /* Hey we are in the same JVM, don't use the network. */
             return false;
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
index 344bd59..c2ae49d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
@@ -2,7 +2,7 @@ package org.apache.s4.core.staging;
 
 import java.util.concurrent.ExecutorService;
 
-import org.apache.s4.comm.ThrottlingThreadPoolExecutorService;
+import org.apache.s4.comm.staging.ThrottlingThreadPoolExecutorService;
 
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
@@ -14,8 +14,8 @@ import com.google.inject.name.Named;
  */
 public class DefaultSenderExecutorServiceFactory implements SenderExecutorServiceFactory {
 
-    private int threadPoolSize;
-    private int workQueueSize;
+    private final int threadPoolSize;
+    private final int workQueueSize;
 
     @Inject
     public DefaultSenderExecutorServiceFactory(@Named("s4.sender.parallelism") int threadPoolSize,

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
index e0b52a9..864b942 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
@@ -2,7 +2,7 @@ package org.apache.s4.core.staging;
 
 import java.util.concurrent.ExecutorService;
 
-import org.apache.s4.comm.ThrottlingThreadPoolExecutorService;
+import org.apache.s4.comm.staging.ThrottlingThreadPoolExecutorService;
 
 import com.google.inject.Inject;
 import com.google.inject.name.Named;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
index fd2d590..1b3a900 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -41,6 +41,10 @@ public class S4Metrics {
     private final Meter bytesMeter = Metrics.newMeter(ReceiverImpl.class, "received-bytes", "bytes-count",
             TimeUnit.SECONDS);
 
+    private final Meter localEventsMeter = Metrics.newMeter(Stream.class, "sent-local", "sent-local", TimeUnit.SECONDS);
+    private final Meter remoteEventsMeter = Metrics.newMeter(Stream.class, "sent-remote", "sent-remote",
+            TimeUnit.SECONDS);
+
     private Meter[] senderMeters;
 
     private final Map<String, Meter> dequeuingStreamMeters = Maps.newHashMap();
@@ -56,6 +60,16 @@ public class S4Metrics {
         for (int i = 0; i < senderMeters.length; i++) {
             senderMeters[i] = Metrics.newMeter(SenderImpl.class, "sender", "sent-to-" + (i), TimeUnit.SECONDS);
         }
+        Metrics.newGauge(Stream.class, "local-vs-remote", new Gauge<Double>() {
+
+            @Override
+            public Double value() {
+                // this will return NaN if divider is zero
+                return localEventsMeter.oneMinuteRate() / remoteEventsMeter.oneMinuteRate();
+            }
+
+        });
+
     }
 
     public void createCacheGauges(ProcessingElement prototype, final LoadingCache<String, ProcessingElement> cache) {
@@ -94,6 +108,7 @@ public class S4Metrics {
     }
 
     public void sentEvent(int partition) {
+        remoteEventsMeter.mark();
         try {
             senderMeters[partition].mark();
         } catch (NullPointerException e) {
@@ -103,6 +118,10 @@ public class S4Metrics {
         }
     }
 
+    public void sentLocal() {
+        localEventsMeter.mark();
+    }
+
     public void createStreamMeters(String name) {
         // TODO avoid maps to avoid map lookups?
         dequeuingStreamMeters.put(name,
@@ -134,14 +153,15 @@ public class S4Metrics {
 
     public static class CheckpointingMetrics {
 
-        static Meter rejectedSerializationTask = Metrics.newMeter(CheckpointingMetrics.class, "checkpointing",
-                "rejected-serialization-task", TimeUnit.SECONDS);
-        static Meter rejectedStorageTask = Metrics.newMeter(CheckpointingMetrics.class, "checkpointing",
-                "rejected-storage-task", TimeUnit.SECONDS);
-        static Meter fetchedCheckpoint = Metrics.newMeter(CheckpointingMetrics.class, "checkpointing",
-                "fetched-checkpoint", TimeUnit.SECONDS);
-        static Meter fetchedCheckpointFailure = Metrics.newMeter(CheckpointingMetrics.class, "checkpointing",
-                "fetched-checkpoint-failed", TimeUnit.SECONDS);
+        static Meter rejectedSerializationTask = Metrics.newMeter(CheckpointingMetrics.class,
+                "checkpointing-rejected-serialization-task", "checkpointing-rejected-serialization-task",
+                TimeUnit.SECONDS);
+        static Meter rejectedStorageTask = Metrics.newMeter(CheckpointingMetrics.class,
+                "checkpointing-rejected-storage-task", "checkpointing-rejected-storage-task", TimeUnit.SECONDS);
+        static Meter fetchedCheckpoint = Metrics.newMeter(CheckpointingMetrics.class,
+                "checkpointing-fetched-checkpoint", "checkpointing-fetched-checkpoint", TimeUnit.SECONDS);
+        static Meter fetchedCheckpointFailure = Metrics.newMeter(CheckpointingMetrics.class,
+                "checkpointing-fetched-checkpoint-failed", "checkpointing-fetched-checkpoint-failed", TimeUnit.SECONDS);
 
         public static void rejectedSerializationTask() {
             rejectedSerializationTask.mark();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/8117de6d/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index fc87265..e18acbf 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -18,8 +18,8 @@
 
 package org.apache.s4.fixtures;
 
-import org.apache.s4.comm.DefaultDeserializerExecutorFactory;
 import org.apache.s4.comm.DeserializerExecutorFactory;
+import org.apache.s4.comm.staging.MemoryAwareDeserializerExecutorFactory;
 import org.apache.s4.core.staging.DefaultSenderExecutorServiceFactory;
 import org.apache.s4.core.staging.DefaultStreamProcessingExecutorServiceFactory;
 import org.apache.s4.core.staging.SenderExecutorServiceFactory;
@@ -53,7 +53,7 @@ public class MockCoreModule extends AbstractModule {
         bind(StreamExecutorServiceFactory.class).to(DefaultStreamProcessingExecutorServiceFactory.class);
 
         bind(SenderExecutorServiceFactory.class).to(DefaultSenderExecutorServiceFactory.class);
-        bind(DeserializerExecutorFactory.class).to(DefaultDeserializerExecutorFactory.class);
+        bind(DeserializerExecutorFactory.class).to(MemoryAwareDeserializerExecutorFactory.class);
 
         bind(Integer.class).annotatedWith(Names.named("s4.sender.parallelism")).toInstance(8);
         bind(Integer.class).annotatedWith(Names.named("s4.sender.workQueueSize")).toInstance(10000);