You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/11/21 19:12:15 UTC

git commit: Fix classloading issue - classes with dependency on the application classloader must be bound through the AppModule, and relevant classes now include Listener implementations for instance

Updated Branches:
  refs/heads/S4-95 f9689ea00 -> cc2c4e888


Fix classloading issue
- classes with dependency on the application classloader must be bound through the AppModule,
and relevant classes now include Listener implementations for instance


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/cc2c4e88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/cc2c4e88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/cc2c4e88

Branch: refs/heads/S4-95
Commit: cc2c4e8886df7b69e861f9e7d32fdad7094fe25f
Parents: f9689ea
Author: Matthieu Morel <mm...@apache.org>
Authored: Wed Nov 21 19:11:10 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Wed Nov 21 20:11:23 2012 +0100

----------------------------------------------------------------------
 .../apache/s4/benchmark/simpleApp1/SimpleApp.java  |    3 +-
 .../java/org/apache/s4/comm/DefaultCommModule.java |   10 ++--
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |    8 ++--
 .../org/apache/s4/comm/udp/UDPRemoteEmitter.java   |    5 +-
 .../java/org/apache/s4/comm/util/CommMetrics.java  |   30 ------------
 .../org/apache/s4/comm/util/EmitterMetrics.java    |   26 ++++++++++
 .../java/org/apache/s4/comm/tcp/TCPBasicTest.java  |    5 +-
 .../java/org/apache/s4/comm/udp/UDPBasicTest.java  |    5 +-
 .../org/apache/s4/fixtures/TCPTransportModule.java |   18 +++++++
 .../org/apache/s4/fixtures/UDPTransportModule.java |   18 +++++++
 .../src/main/java/org/apache/s4/core/App.java      |    5 ++
 .../main/java/org/apache/s4/core/AppModule.java    |   12 +++++
 .../java/org/apache/s4/core/DefaultCoreModule.java |    8 ---
 .../java/org/apache/s4/core/ProcessingElement.java |    3 -
 .../main/java/org/apache/s4/core/ReceiverImpl.java |   12 +++--
 .../main/java/org/apache/s4/core/RemoteSender.java |    5 --
 .../java/org/apache/s4/core/RemoteSenders.java     |    3 +-
 .../main/java/org/apache/s4/core/SenderImpl.java   |   16 ++++---
 .../src/main/java/org/apache/s4/core/Stream.java   |    9 ++-
 .../java/org/apache/s4/core/util/S4Metrics.java    |   37 +++++++--------
 .../org/apache/s4/fixtures/MockCommModule.java     |   16 +++++--
 .../org/apache/s4/fixtures/MockCoreModule.java     |   12 +----
 .../java/org/apache/s4/example/counter/MyApp.java  |    2 +-
 .../s4/example/twitter/TwitterCounterApp.java      |   21 ++++++++
 24 files changed, 177 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
index 00ac454..c2fde2b 100644
--- a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
@@ -13,6 +13,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 import com.yammer.metrics.reporting.ConsoleReporter;
+import com.yammer.metrics.reporting.CsvReporter;
 
 public class SimpleApp extends App {
 
@@ -34,7 +35,7 @@ public class SimpleApp extends App {
                 throw new RuntimeException("Cannot create log dir " + logDirectory.getAbsolutePath());
             }
         }
-        // CsvReporter.enable(logDirectory, 5, TimeUnit.SECONDS);
+        CsvReporter.enable(logDirectory, 10, TimeUnit.SECONDS);
         ConsoleReporter.enable(10, TimeUnit.SECONDS);
 
         SimplePE1 simplePE1 = createPE(SimplePE1.class, "simplePE1");

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index 682bb39..e444e1e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -27,7 +27,6 @@ import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
 import org.apache.s4.base.RemoteEmitter;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
@@ -100,6 +99,10 @@ public class DefaultCommModule extends AbstractModule {
 
         bind(Clusters.class).to(ClustersFromZK.class);
 
+        bind(RemoteEmitters.class);
+
+        bind(DeserializerExecutorFactory.class).to(DefaultDeserializerExecutorFactory.class);
+
         try {
             Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
                     .getString("s4.comm.emitter.class"));
@@ -112,14 +115,9 @@ public class DefaultCommModule extends AbstractModule {
                     RemoteEmitterFactory.class));
             bind(RemoteEmitters.class);
 
-            bind(Listener.class).to(
-                    (Class<? extends Listener>) Class.forName(config.getString("s4.comm.listener.class")));
-
-            bind(DeserializerExecutorFactory.class).to(DefaultDeserializerExecutorFactory.class);
         } catch (ClassNotFoundException e) {
             logger.error("Cannot find class implementation ", e);
         }
-
     }
 
     @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index d09f2b6..c15f355 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -32,8 +32,7 @@ import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterChangeListener;
 import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.util.CommMetrics;
-import org.apache.s4.comm.util.CommMetrics.EmitterMetrics;
+import org.apache.s4.comm.util.EmitterMetrics;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -94,7 +93,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     SerializerDeserializerFactory serDeserFactory;
     SerializerDeserializer serDeser;
 
-    CommMetrics.EmitterMetrics metrics;
+    EmitterMetrics metrics;
 
     @Inject
     public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout) throws InterruptedException {
@@ -133,8 +132,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         refreshCluster();
         this.topology.addListener(this);
         serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
-
         metrics = new EmitterMetrics(topology);
+
     }
 
     private boolean connectTo(Integer partitionId) {
@@ -205,6 +204,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         });
     }
 
+    @Override
     public void close() {
         try {
             channels.close().await();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
index 050a6ec..b1d1615 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
@@ -18,6 +18,7 @@
 
 package org.apache.s4.comm.udp;
 
+import org.apache.s4.base.RemoteEmitter;
 import org.apache.s4.comm.topology.Cluster;
 
 import com.google.inject.Inject;
@@ -25,9 +26,9 @@ import com.google.inject.assistedinject.Assisted;
 
 /**
  * UDP-based emitter for sending events to remote clusters.
- *
+ * 
  */
-public class UDPRemoteEmitter extends UDPEmitter {
+public class UDPRemoteEmitter extends UDPEmitter implements RemoteEmitter {
 
     /**
      * Sends to remote subclusters. This is dynamically created, through an injected factory, when new subclusters are

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/CommMetrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/CommMetrics.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/CommMetrics.java
deleted file mode 100644
index 37ff121..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/CommMetrics.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.s4.comm.util;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.topology.Cluster;
-
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Meter;
-
-public class CommMetrics {
-
-    public static class EmitterMetrics {
-        private Meter[] emittersMeters;
-
-        public EmitterMetrics(Cluster cluster) {
-            emittersMeters = new Meter[cluster.getPhysicalCluster().getPartitionCount()];
-            for (int i = 0; i < cluster.getPhysicalCluster().getPartitionCount(); i++) {
-                emittersMeters[i] = Metrics
-                        .newMeter(TCPEmitter.class, "event-emitted@" + cluster.getPhysicalCluster().getName()
-                                + "@partition-" + i, "event-emitted", TimeUnit.SECONDS);
-            }
-        }
-
-        public void sentMessage(int partitionId) {
-            emittersMeters[partitionId].mark();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
new file mode 100644
index 0000000..2fe7e3e
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
@@ -0,0 +1,26 @@
+package org.apache.s4.comm.util;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.topology.Cluster;
+
+import com.google.inject.Inject;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Meter;
+
+public class EmitterMetrics {
+    private final Meter[] emittersMeters;
+
+    public EmitterMetrics(Cluster cluster) {
+        emittersMeters = new Meter[cluster.getPhysicalCluster().getPartitionCount()];
+        for (int i = 0; i < cluster.getPhysicalCluster().getPartitionCount(); i++) {
+            emittersMeters[i] = Metrics.newMeter(TCPEmitter.class, "event-emitted@"
+                    + cluster.getPhysicalCluster().getName() + "@partition-" + i, "event-emitted", TimeUnit.SECONDS);
+        }
+    }
+
+    public void sentMessage(int partitionId) {
+        emittersMeters[partitionId].mark();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
index 741524a..f43d61f 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
@@ -11,6 +11,7 @@ import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.MockReceiverModule;
 import org.apache.s4.fixtures.NoOpReceiverModule;
+import org.apache.s4.fixtures.TCPTransportModule;
 import org.apache.s4.fixtures.ZkBasedTest;
 import org.junit.Test;
 
@@ -26,12 +27,12 @@ public class TCPBasicTest extends ZkBasedTest {
         try {
             Injector injector1 = Guice
                     .createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
-                            .openStream(), "cluster1"), new NoOpReceiverModule());
+                            .openStream(), "cluster1"), new TCPTransportModule(), new NoOpReceiverModule());
             Emitter emitter = injector1.getInstance(Emitter.class);
 
             Injector injector2 = Guice
                     .createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
-                            .openStream(), "cluster1"), new MockReceiverModule());
+                            .openStream(), "cluster1"), new TCPTransportModule(), new MockReceiverModule());
             // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
             // listener, here a mock which simply intercepts the message and notifies through a countdown latch)
             injector2.getInstance(Listener.class);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
index 54e6347..42bdf91 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
@@ -11,6 +11,7 @@ import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.MockReceiverModule;
 import org.apache.s4.fixtures.NoOpReceiverModule;
+import org.apache.s4.fixtures.UDPTransportModule;
 import org.apache.s4.fixtures.ZkBasedTest;
 import org.junit.Test;
 
@@ -26,12 +27,12 @@ public class UDPBasicTest extends ZkBasedTest {
         try {
             Injector injector1 = Guice.createInjector(
                     new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(), "cluster1"),
-                    new NoOpReceiverModule());
+                    new UDPTransportModule(), new NoOpReceiverModule());
             Emitter emitter = injector1.getInstance(Emitter.class);
 
             Injector injector2 = Guice.createInjector(
                     new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(), "cluster1"),
-                    new MockReceiverModule());
+                    new UDPTransportModule(), new MockReceiverModule());
             // creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
             // listener)
             injector2.getInstance(Listener.class);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TCPTransportModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TCPTransportModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TCPTransportModule.java
new file mode 100644
index 0000000..27ae24b
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TCPTransportModule.java
@@ -0,0 +1,18 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
+
+import com.google.inject.AbstractModule;
+
+public class TCPTransportModule extends AbstractModule {
+
+    @Override
+    protected void configure() {
+        bind(Listener.class).to(TCPListener.class);
+        bind(Emitter.class).to(TCPEmitter.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/UDPTransportModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/UDPTransportModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/UDPTransportModule.java
new file mode 100644
index 0000000..1d1bc7a
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/UDPTransportModule.java
@@ -0,0 +1,18 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+
+public class UDPTransportModule extends AbstractModule {
+
+    @Override
+    protected void configure() {
+        bind(Listener.class).to(UDPListener.class);
+        bind(Emitter.class).to(UDPEmitter.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index eb2ae1d..6b8c7ef 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -32,6 +32,7 @@ import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.topology.RemoteStreams;
 import org.apache.s4.core.ft.CheckpointingFramework;
 import org.apache.s4.core.staging.StreamExecutorServiceFactory;
+import org.apache.s4.core.util.S4Metrics;
 import org.apache.s4.core.window.AbstractSlidingWindowPE;
 import org.apache.s4.core.window.SlotFactory;
 import org.slf4j.Logger;
@@ -86,6 +87,9 @@ public abstract class App {
     @Inject
     CheckpointingFramework checkpointingFramework;
 
+    @Inject
+    S4Metrics metrics;
+
     // serialization uses the application class loader
     @Inject
     private SerializerDeserializerFactory serDeserFactory;
@@ -222,6 +226,7 @@ public abstract class App {
 
         // logger.info("Add PE prototype [{}] with stream [{}].", toString(pePrototype), toString(stream));
         pePrototypes.add(pePrototype);
+        metrics.createCacheGauges(pePrototype, pePrototype.peInstances);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
index 4d07afe..909fea6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
@@ -1,7 +1,12 @@
 package org.apache.s4.core;
 
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.Sender;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.core.util.S4Metrics;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
@@ -21,6 +26,13 @@ public class AppModule extends AbstractModule {
 
     @Override
     protected void configure() {
+        bind(S4Metrics.class);
+
+        bind(Receiver.class).to(ReceiverImpl.class);
+        bind(Sender.class).to(SenderImpl.class);
+
+        // TODO allow pluggable transport implementation
+        bind(Listener.class).to(TCPListener.class);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 5de792e..8b7a632 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -25,8 +25,6 @@ import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Receiver;
-import org.apache.s4.base.Sender;
 import org.apache.s4.base.util.S4RLoaderFactory;
 import org.apache.s4.comm.DefaultHasher;
 import org.apache.s4.core.ft.CheckpointingFramework;
@@ -37,7 +35,6 @@ import org.apache.s4.core.staging.DefaultStreamProcessingExecutorServiceFactory;
 import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
 import org.apache.s4.core.staging.SenderExecutorServiceFactory;
 import org.apache.s4.core.staging.StreamExecutorServiceFactory;
-import org.apache.s4.core.util.S4Metrics;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.DistributedDeploymentManager;
 import org.slf4j.Logger;
@@ -80,9 +77,6 @@ public class DefaultCoreModule extends AbstractModule {
         /* The hashing function to map keys top partitions. */
         bind(Hasher.class).to(DefaultHasher.class);
 
-        bind(Receiver.class).to(ReceiverImpl.class);
-        bind(Sender.class).to(SenderImpl.class);
-
         bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
 
         bind(S4RLoaderFactory.class);
@@ -91,8 +85,6 @@ public class DefaultCoreModule extends AbstractModule {
         // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
         bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
 
-        bind(S4Metrics.class);
-
         bind(SenderExecutorServiceFactory.class).to(DefaultSenderExecutorServiceFactory.class);
         bind(RemoteSendersExecutorServiceFactory.class).to(DefaultRemoteSendersExecutorServiceFactory.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 295e416..6e219a6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -40,7 +40,6 @@ import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
 import org.apache.s4.core.ft.CheckpointingTask;
 import org.apache.s4.core.gen.OverloadDispatcher;
 import org.apache.s4.core.gen.OverloadDispatcherGenerator;
-import org.apache.s4.core.util.S4Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -172,8 +171,6 @@ public abstract class ProcessingElement implements Cloneable {
          */
         this.pePrototype = this;
 
-        S4Metrics.createCacheGauges(this, peInstances);
-
         processingTimer = Metrics.newTimer(getClass(), getClass().getName() + "-pe-processing-time");
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
index 476f4ac..6b4a1cf 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
@@ -25,7 +25,6 @@ import org.apache.s4.base.Event;
 import org.apache.s4.base.Listener;
 import org.apache.s4.base.Receiver;
 import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.core.util.S4Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,12 +54,15 @@ public class ReceiverImpl implements Receiver {
 
     final private Listener listener;
     final private SerializerDeserializer serDeser;
-    private Map<Integer, Map<String, Stream<? extends Event>>> streams;
+    private final Map<Integer, Map<String, Stream<? extends Event>>> streams;
 
     @Inject
-    public ReceiverImpl(Listener listener, SerializerDeserializerFactory serDeserFactory) {
+    S4Metrics metrics;
+
+    @Inject
+    public ReceiverImpl(Listener listener, SerializerDeserializer serDeser) {
         this.listener = listener;
-        this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+        this.serDeser = serDeser;
 
         streams = new MapMaker().makeMap();
     }
@@ -94,7 +96,7 @@ public class ReceiverImpl implements Receiver {
 
     @Override
     public void receive(ByteBuffer message) {
-        S4Metrics.receivedEventFromCommLayer(message.array().length);
+        metrics.receivedEventFromCommLayer(message.array().length);
         Event event = (Event) serDeser.deserialize(message);
 
         String streamId = event.getStreamName();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index cc9d70b..088c7bf 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Hasher;
-import org.apache.s4.core.util.S4Metrics;
 
 /**
  * Sends events to a remote cluster.
@@ -42,8 +41,6 @@ public class RemoteSender {
         this.hasher = hasher;
         this.remoteClusterName = clusterName;
 
-        S4Metrics.createRemoteStreamMeters(clusterName, emitter.getPartitionCount());
-
     }
 
     public void send(String hashKey, ByteBuffer message) {
@@ -55,7 +52,5 @@ public class RemoteSender {
             partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
         }
         emitter.send(partition, message);
-        S4Metrics.sentEventToRemoteCluster(remoteClusterName, partition);
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index b6a7d5b..71a91e9 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -58,7 +58,7 @@ public class RemoteSenders {
 
     ConcurrentMap<String, RemoteSender> sendersByTopology = new ConcurrentHashMap<String, RemoteSender>();
 
-    private ExecutorService executorService;
+    private final ExecutorService executorService;
 
     @Inject
     public RemoteSenders(RemoteEmitters remoteEmitters, RemoteStreams remoteStreams, Clusters remoteClusters,
@@ -111,6 +111,7 @@ public class RemoteSenders {
         @Override
         public void run() {
             sender.send(hashKey, serDeser.serialize(event));
+
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index 255cdeb..79a356c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -26,7 +26,6 @@ import org.apache.s4.base.Event;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.Sender;
 import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.core.staging.SenderExecutorServiceFactory;
@@ -55,7 +54,10 @@ public class SenderImpl implements Sender {
     Assignment assignment;
     private int localPartitionId = -1;
 
-    private ExecutorService tpe;
+    private final ExecutorService tpe;
+
+    @Inject
+    S4Metrics metrics;
 
     /**
      * 
@@ -67,10 +69,10 @@ public class SenderImpl implements Sender {
      *            a hashing function to map keys to partition IDs.
      */
     @Inject
-    public SenderImpl(Emitter emitter, SerializerDeserializerFactory serDeserFactory, Hasher hasher,
-            Assignment assignment, SenderExecutorServiceFactory senderExecutorServiceFactory) {
+    public SenderImpl(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment,
+            SenderExecutorServiceFactory senderExecutorServiceFactory) {
         this.emitter = emitter;
-        this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+        this.serDeser = serDeser;
         this.hasher = hasher;
         this.assignment = assignment;
         this.tpe = senderExecutorServiceFactory.create();
@@ -98,7 +100,7 @@ public class SenderImpl implements Sender {
         }
         // TODO asynch
         send(partition, serDeser.serialize(event));
-        S4Metrics.sentEvent(partition);
+        metrics.sentEvent(partition);
         return true;
     }
 
@@ -154,7 +156,7 @@ public class SenderImpl implements Sender {
                 /* Don't use the comm layer when we send to the same partition. */
                 if (localPartitionId != i) {
                     emitter.send(i, serializedEvent);
-                    S4Metrics.sentEvent(i);
+                    metrics.sentEvent(i);
 
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 94b19b5..1de77bb 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -29,7 +29,6 @@ import org.apache.s4.base.Receiver;
 import org.apache.s4.base.Sender;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
-import org.apache.s4.core.util.S4Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,8 +78,9 @@ public class Stream<T extends Event> implements Streamable {
         this.receiver = app.getReceiver();
     }
 
+    @Override
     public void start() {
-        S4Metrics.createStreamMeters(name);
+        app.metrics.createStreamMeters(getName());
         if (logger.isTraceEnabled()) {
             if (targetPEs != null) {
                 for (ProcessingElement pe : targetPEs) {
@@ -184,6 +184,7 @@ public class Stream<T extends Event> implements Streamable {
      * 
      * @param event
      */
+    @Override
     @SuppressWarnings("unchecked")
     public void put(Event event) {
         event.setStreamId(getName());
@@ -240,6 +241,7 @@ public class Stream<T extends Event> implements Streamable {
     /**
      * @return the name
      */
+    @Override
     public String getName() {
         return name;
     }
@@ -268,6 +270,7 @@ public class Stream<T extends Event> implements Streamable {
     /**
      * Stop and close this stream.
      */
+    @Override
     public void close() {
     }
 
@@ -329,7 +332,7 @@ public class Stream<T extends Event> implements Streamable {
 
         @Override
         public void run() {
-            S4Metrics.dequeuedEvent(name);
+            app.metrics.dequeuedEvent(name);
 
             /* Send event to each target PE. */
             for (int i = 0; i < targetPEs.length; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
index 27ac387..fd2d590 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -36,18 +36,18 @@ public class S4Metrics {
 
     static List<Meter> partitionSenderMeters = Lists.newArrayList();
 
-    private static Meter eventMeter = Metrics.newMeter(ReceiverImpl.class, "received-events", "event-count",
+    private final Meter eventMeter = Metrics.newMeter(ReceiverImpl.class, "received-events", "event-count",
             TimeUnit.SECONDS);
-    private static Meter bytesMeter = Metrics.newMeter(ReceiverImpl.class, "received-bytes", "bytes-count",
+    private final Meter bytesMeter = Metrics.newMeter(ReceiverImpl.class, "received-bytes", "bytes-count",
             TimeUnit.SECONDS);
 
-    private static Meter[] senderMeters;
+    private Meter[] senderMeters;
 
-    private static Map<String, Meter> dequeuingStreamMeters = Maps.newHashMap();
-    private static Map<String, Meter> droppedStreamMeters = Maps.newHashMap();
-    private static Map<String, Meter> streamQueueFullMeters = Maps.newHashMap();
+    private final Map<String, Meter> dequeuingStreamMeters = Maps.newHashMap();
+    private final Map<String, Meter> droppedStreamMeters = Maps.newHashMap();
+    private final Map<String, Meter> streamQueueFullMeters = Maps.newHashMap();
 
-    private static Map<String, Meter[]> remoteSenderMeters = Maps.newHashMap();
+    private final Map<String, Meter[]> remoteSenderMeters = Maps.newHashMap();
 
     @Inject
     private void init() {
@@ -58,24 +58,23 @@ public class S4Metrics {
         }
     }
 
-    public static void createCacheGauges(ProcessingElement prototype,
-            final LoadingCache<String, ProcessingElement> cache) {
+    public void createCacheGauges(ProcessingElement prototype, final LoadingCache<String, ProcessingElement> cache) {
 
-        Metrics.newGauge(prototype.getClass(), "PE-cache-entries", new Gauge<Long>() {
+        Metrics.newGauge(prototype.getClass(), prototype.getClass().getName() + "-cache-entries", new Gauge<Long>() {
 
             @Override
             public Long value() {
                 return cache.size();
             }
         });
-        Metrics.newGauge(prototype.getClass(), "PE-cache-evictions", new Gauge<Long>() {
+        Metrics.newGauge(prototype.getClass(), prototype.getClass().getName() + "-cache-evictions", new Gauge<Long>() {
 
             @Override
             public Long value() {
                 return cache.stats().evictionCount();
             }
         });
-        Metrics.newGauge(prototype.getClass(), "PE-cache-misses", new Gauge<Long>() {
+        Metrics.newGauge(prototype.getClass(), prototype.getClass().getName() + "-cache-misses", new Gauge<Long>() {
 
             @Override
             public Long value() {
@@ -84,17 +83,17 @@ public class S4Metrics {
         });
     }
 
-    public static void receivedEventFromCommLayer(int bytes) {
+    public void receivedEventFromCommLayer(int bytes) {
         eventMeter.mark();
         bytesMeter.mark(bytes);
     }
 
-    public static void queueIsFull(String name) {
+    public void queueIsFull(String name) {
         streamQueueFullMeters.get(name).mark();
 
     }
 
-    public static void sentEvent(int partition) {
+    public void sentEvent(int partition) {
         try {
             senderMeters[partition].mark();
         } catch (NullPointerException e) {
@@ -104,7 +103,7 @@ public class S4Metrics {
         }
     }
 
-    public static void createStreamMeters(String name) {
+    public void createStreamMeters(String name) {
         // TODO avoid maps to avoid map lookups?
         dequeuingStreamMeters.put(name,
                 Metrics.newMeter(Stream.class, "dequeued@" + name, "dequeued", TimeUnit.SECONDS));
@@ -113,11 +112,11 @@ public class S4Metrics {
                 Metrics.newMeter(Stream.class, "stream-full@" + name, "stream-full", TimeUnit.SECONDS));
     }
 
-    public static void dequeuedEvent(String name) {
+    public void dequeuedEvent(String name) {
         dequeuingStreamMeters.get(name).mark();
     }
 
-    public static void createRemoteStreamMeters(String remoteClusterName, int partitionCount) {
+    public void createRemoteStreamMeters(String remoteClusterName, int partitionCount) {
         Meter[] meters = new Meter[partitionCount];
         for (int i = 0; i < partitionCount; i++) {
             meters[i] = Metrics.newMeter(RemoteSender.class, "remote-sender@" + remoteClusterName + "@partition-" + i,
@@ -129,7 +128,7 @@ public class S4Metrics {
 
     }
 
-    public static void sentEventToRemoteCluster(String remoteClusterName, int partition) {
+    public void sentEventToRemoteCluster(String remoteClusterName, int partition) {
         remoteSenderMeters.get(remoteClusterName)[partition].mark();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
index 2bdcc5b..1dadc5b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
@@ -18,7 +18,9 @@
 
 package org.apache.s4.fixtures;
 
+import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Hasher;
+import org.apache.s4.base.RemoteEmitter;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.DefaultHasher;
 import org.apache.s4.comm.RemoteEmitterFactory;
@@ -26,8 +28,10 @@ import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.tcp.RemoteEmitters;
 import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.PhysicalCluster;
 import org.apache.s4.comm.topology.RemoteStreams;
 import org.apache.s4.core.RemoteSenders;
 import org.mockito.Mockito;
@@ -57,13 +61,17 @@ public class MockCommModule extends AbstractModule {
         bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
         bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
         bind(RemoteEmitters.class).toInstance(Mockito.mock(RemoteEmitters.class));
-        bind(RemoteEmitterFactory.class).toInstance(Mockito.mock(RemoteEmitterFactory.class));
         bind(Clusters.class).toInstance(Mockito.mock(Clusters.class));
-
+        Cluster mockedCluster = Mockito.mock(Cluster.class);
+        Mockito.when(mockedCluster.getPhysicalCluster()).thenReturn(new PhysicalCluster(1));
+        bind(Cluster.class).toInstance(mockedCluster);
         Assignment mockedAssignment = Mockito.mock(Assignment.class);
         Mockito.when(mockedAssignment.assignClusterNode()).thenReturn(new ClusterNode(0, 0, "machine", "Task-0"));
         bind(Assignment.class).toInstance(mockedAssignment);
-        Names.bindProperties(binder(), ImmutableMap.of("s4.cluster.name", "testCluster"));
-    }
+        Names.bindProperties(binder(), ImmutableMap.of("s4.cluster.name", "testCluster", "s4.comm.timeout", "10000"));
+        bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
+        install(new FactoryModuleBuilder().implement(RemoteEmitter.class, Mockito.mock(RemoteEmitter.class).getClass())
+                .build(RemoteEmitterFactory.class));
 
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index 02fd694..fc87265 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -18,17 +18,14 @@
 
 package org.apache.s4.fixtures;
 
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.Sender;
-import org.apache.s4.core.ReceiverImpl;
+import org.apache.s4.comm.DefaultDeserializerExecutorFactory;
+import org.apache.s4.comm.DeserializerExecutorFactory;
 import org.apache.s4.core.staging.DefaultSenderExecutorServiceFactory;
 import org.apache.s4.core.staging.DefaultStreamProcessingExecutorServiceFactory;
 import org.apache.s4.core.staging.SenderExecutorServiceFactory;
 import org.apache.s4.core.staging.StreamExecutorServiceFactory;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.NoOpDeploymentManager;
-import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,16 +47,13 @@ public class MockCoreModule extends AbstractModule {
     @Override
     protected void configure() {
         bind(DeploymentManager.class).to(NoOpDeploymentManager.class);
-        bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
-        bind(Listener.class).toInstance(Mockito.mock(Listener.class));
-        bind(ReceiverImpl.class).toInstance(Mockito.mock(ReceiverImpl.class));
-        bind(Sender.class).toInstance(Mockito.mock(Sender.class));
 
         // Although we want to mock as much as possible, most tests still require the machinery for routing events
         // within a stream/node, therefore sender and stream executors are not mocked
         bind(StreamExecutorServiceFactory.class).to(DefaultStreamProcessingExecutorServiceFactory.class);
 
         bind(SenderExecutorServiceFactory.class).to(DefaultSenderExecutorServiceFactory.class);
+        bind(DeserializerExecutorFactory.class).to(DefaultDeserializerExecutorFactory.class);
 
         bind(Integer.class).annotatedWith(Names.named("s4.sender.parallelism")).toInstance(8);
         bind(Integer.class).annotatedWith(Names.named("s4.sender.workQueueSize")).toInstance(10000);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
index 1ea3a2a..46cf849 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
@@ -131,6 +131,6 @@ final public class MyApp extends App {
             e.printStackTrace();
         }
         myApp.close();
-        receiver.close();
+        // receiver.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cc2c4e88/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
index 5d7855f..456305a 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
@@ -18,16 +18,21 @@
 
 package org.apache.s4.example.twitter;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.core.App;
 import org.apache.s4.core.Stream;
 import org.apache.s4.core.ft.CheckpointingConfig;
 import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableList;
+import com.yammer.metrics.reporting.CsvReporter;
 
 public class TwitterCounterApp extends App {
 
@@ -39,6 +44,9 @@ public class TwitterCounterApp extends App {
     protected void onInit() {
         try {
 
+            // uncomment the following in order to get metrics outputs in .csv files
+            // prepareMetricsOutputs();
+
             TopNTopicPE topNTopicPE = createPE(TopNTopicPE.class);
             topNTopicPE.setTimerInterval(10, TimeUnit.SECONDS);
             // we checkpoint this PE every 20s
@@ -77,6 +85,19 @@ public class TwitterCounterApp extends App {
         }
     }
 
+    private void prepareMetricsOutputs() throws IOException {
+        File metricsDirForPartition = new File("metrics/" + getReceiver().getPartitionId());
+        if (metricsDirForPartition.exists()) {
+            FileUtils.deleteDirectory(metricsDirForPartition);
+        }
+        // activate metrics csv dump
+        if (!metricsDirForPartition.mkdirs()) {
+            LoggerFactory.getLogger(getClass()).error("Cannot create directory {}",
+                    new File("metrics").getAbsolutePath());
+        }
+        CsvReporter.enable(metricsDirForPartition, 10, TimeUnit.SECONDS);
+    }
+
     @Override
     protected void onStart() {