You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/01/18 12:16:46 UTC

[1/15] git commit: Add metrics and benchmarking framework - subproject/s4-benchmarks contains a simple benchmarking framework - metrics are placed at various places across the codebase - fixed a few potential race conditions

Add metrics and benchmarking framework
- subproject/s4-benchmarks contains a simple benchmarking framework
- metrics are placed at various places across the codebase
- fixed a few potential race conditions


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/80dfe403
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/80dfe403
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/80dfe403

Branch: refs/heads/dev
Commit: 80dfe4031c55d50abc38bf298840aeebf23dfd1f
Parents: ad1b1bf
Author: Matthieu Morel <mm...@apache.org>
Authored: Mon Sep 10 10:00:34 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Mon Sep 10 18:20:16 2012 +0200

----------------------------------------------------------------------
 build.gradle                                       |    6 +-
 settings.gradle                                    |    7 +-
 subprojects/s4-benchmarks/README.md                |   62 ++++
 subprojects/s4-benchmarks/bench-cluster.sh         |   58 ++++
 subprojects/s4-benchmarks/config/injector.config   |    3 +
 subprojects/s4-benchmarks/config/node.config       |    4 +
 subprojects/s4-benchmarks/s4-benchmarks.gradle     |   82 +++++
 .../apache/s4/benchmark/simpleApp1/Injector.java   |  235 +++++++++++++++
 .../apache/s4/benchmark/simpleApp1/SimpleApp.java  |   69 +++++
 .../apache/s4/benchmark/simpleApp1/SimplePE1.java  |   91 ++++++
 .../java/org/apache/s4/benchmark/utils/Utils.java  |   39 +++
 .../s4-benchmarks/src/main/resources/logback.xml   |   14 +
 subprojects/s4-benchmarks/startNode.sh             |   21 ++
 .../org/apache/s4/comm/serialize/KryoSerDeser.java |    4 +-
 .../org/apache/s4/comm/tcp/RemoteEmitters.java     |   14 +-
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |   15 +-
 .../java/org/apache/s4/comm/util/CommMetrics.java  |   30 ++
 .../src/main/java/org/apache/s4/core/App.java      |    2 +-
 .../src/main/java/org/apache/s4/core/Main.java     |    9 +
 .../java/org/apache/s4/core/ProcessingElement.java |   13 +-
 .../src/main/java/org/apache/s4/core/Receiver.java |    4 +-
 .../main/java/org/apache/s4/core/RemoteSender.java |   23 +-
 .../java/org/apache/s4/core/RemoteSenders.java     |   22 +-
 .../main/java/org/apache/s4/core/RemoteStream.java |    7 +-
 .../src/main/java/org/apache/s4/core/Sender.java   |    9 +-
 .../src/main/java/org/apache/s4/core/Stream.java   |    9 +-
 .../java/org/apache/s4/core/ft/SafeKeeper.java     |   19 +-
 .../java/org/apache/s4/core/util/S4Metrics.java    |  143 +++++++++
 28 files changed, 959 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 60577de..674009b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -86,7 +86,8 @@ project.ext["libraries"] = [
     gradle_base_services: 'org.gradle:gradle-base-services:1.0',
     gradle_core: 'org.gradle:gradle-core:1.0',
     gradle_tooling_api: 'org.gradle:gradle-tooling-api:1.0',
-    gradle_wrapper: 'gradle-wrapper:gradle-wrapper:1.0'
+    gradle_wrapper:     'gradle-wrapper:gradle-wrapper:1.0',
+    metrics:            'com.yammer.metrics:metrics-core:2.1.3'
 ]
 
 subprojects {
@@ -131,6 +132,7 @@ subprojects {
         /* Misc. */
         compile( libraries.jcip )
         compile( libraries.zk )
+        compile( libraries.metrics )
 
         /* Testing. */
         testCompile( libraries.junit )
@@ -168,7 +170,7 @@ subprojects {
 
 evaluationDependsOnChildren()
 
-project.ext["platformProjects"] = [project(':s4-base'), project(':s4-core'), project(':s4-comm'), project(':s4-tools')]
+project.ext["platformProjects"] = [project(':s4-base'), project(':s4-core'), project(':s4-comm'), project(':s4-tools'), project(':s4-benchmarks')]
 
 configurations {
     platformLibs

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index c2b9b67..61d75e4 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -19,9 +19,10 @@
 include 's4-base'
 include 's4-core'
 include 's4-comm'
-include 's4-edsl'
-include 's4-example'
-include 's4-tools'
+//include 's4-edsl'
+//include 's4-example'
+include 's4-tools'
+include 's4-benchmarks'
 //include 's4-example'
 //include ':test-apps:simple-adapter-1'
 include ':test-apps:simple-deployable-app-1'

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-benchmarks/README.md
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/README.md b/subprojects/s4-benchmarks/README.md
new file mode 100644
index 0000000..f7109a3
--- /dev/null
+++ b/subprojects/s4-benchmarks/README.md
@@ -0,0 +1,62 @@
+Simple S4 Benchmarking Framework
+================================
+
+This framework is intended primarily to identify hotspots in S4 platform code easily and to evaluate the impact of refactorings or new features. 
+
+> The numbers it produces are only useful in comparison with a baseline from other measurements from the same benchmark and do not represent absolute performance numbers. For that, one should use a full-fledged load injection framework or measure the performance of a live application.
+
+That said, let's look at what the benchmarking framework does and how to use it.
+
+## Measurements
+
+The benchmarking framework consists of a multithreaded injector and an application. App nodes and injector are launched directly, there is no deployment step. This allows to skip the packaging and deployment steps and allows to easily add profiling parameters, but requires a source distribution and a shared file system.
+
+The simplest application does nothing but count incoming keyed messages, on a single stream, but other simple application can be easily added. For instance, with multiple streams, and communicating PEs.
+
+The injector sends basic keyed messages. The outputstream of the injector uses a keyfinder to partition the events across the application nodes.
+
+We get metrics from the probes across the codebase, in particular:
+- the rate of events sent per second (in the injector)
+- the rate of events received per second (in the app nodes)
+
+Metrics from the platform code are computed with weighted moving averages.
+
+Profiling options can easily be added to the injector or app nodes in order to identify hotspots.
+
+## Parameters
+
+We provide a script for that purpose: `bench-cluster.sh`
+
+Input parameters are:
+
+- host names on which to start S4 nodes
+	* they must either be : localhost, or accessible through ssh (_a shared file system is assumed_) 
+- injector configuration (see below)
+- node configuration (you __must__ specify the correct zookeeper connection string. By default, a server is created on the node where the `bench-cluster.sh` script is executed)
+
+ 
+Exmample configuration files are available in `/config` and you can configure :
+
+- the number of keys
+- the number of warmup iterations
+- the number of test iterations
+- the number of parallel injection threads
+- etc…
+
+By default in this example the size of a message is 188 bytes.
+
+
+## Running
+
+Running 2 S4 nodes on the local machine:
+`./bench-cluster.sh "localhost localhost" `pwd`/config/injector.config `pwd`/config/node.config`
+
+For a distributed setup, you should modify the host names in the above command line, and specify the correct Zookeeper connection string in `node.config`.
+
+## Results
+
+
+When the benchmark finishes, results are available in `measurements/injectors` for the injection rates and in `measurements/node[0-n]` for other statistics.
+
+Most statistics files come from the probes of the platform and some of them use weighted moving averages. These are good for long running applications. For the benchmarks we also show instant rates, which are available in `injection-rate.csv` and `simplePE1.csv` files.
+

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-benchmarks/bench-cluster.sh
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/bench-cluster.sh b/subprojects/s4-benchmarks/bench-cluster.sh
new file mode 100755
index 0000000..2b51037
--- /dev/null
+++ b/subprojects/s4-benchmarks/bench-cluster.sh
@@ -0,0 +1,58 @@
+#!/bin/bash -x
+
+
+HOSTS=$1
+INJECTOR_CONFIG=$2
+NODE_CONFIG=$3
+BENCH_ROOTDIR=`pwd`
+
+echo "hosts = $HOSTS"
+echo "injector config file = $INJECTOR_CONFIG"
+echo "node config file = $NODE_CONFIG"
+echo "bench root dir = $BENCH_ROOTDIR"
+
+killall -9 java
+
+cd $BENCH_ROOTDIR
+
+rm -Rf measurements/*
+
+$BENCH_ROOTDIR/../../gradlew -b=s4-benchmarks.gradle compileJava
+$BENCH_ROOTDIR/../../gradlew -b=s4-benchmarks.gradle cp
+
+NB_NODES=0
+for host in $HOSTS
+do
+	((NB_NODES++))
+done
+
+(cd $BENCH_ROOTDIR/../../ && ./s4 zkServer -clusters=c=testCluster1:flp=12000:nbTasks=1,c=testCluster2:flp=13000:nbTasks=$NB_NODES &)
+
+
+sleep 6
+
+BENCH=`date +"%Y-%m-%d--%H-%M-%S"`
+BENCH_DIR=$BENCH_ROOTDIR/$BENCH
+echo "bench dir is: $BENCH_DIR"
+mkdir $BENCH
+
+echo "nb nodes = $NB_NODES\n" > $BENCH/benchConf.txt
+echo "hosts = $HOSTS" >> $BENCH/benchConf.txt
+echo "injector config ">> $BENCH/benchConf.txt
+cat $INJECTOR_CONFIG >> $BENCH/benchConf.txt
+
+for host in $HOSTS
+do
+  if [ $host == "localhost" ] || [ $host == "127.0.0.1" ] ; then
+    $BENCH_ROOTDIR/startNode.sh $BENCH_ROOTDIR $NODE_CONFIG "localhost" > $BENCH_DIR/output_$i.log 2>$BENCH_DIR/s4err_$i.err < /dev/null &
+  else
+    ssh $host "$BENCH_ROOTDIR/startNode.sh $BENCH_ROOTDIR $NODE_CONFIG $host > $BENCH_DIR/output_$host.log 2>$BENCH_DIR/s4err_$host	.err < /dev/null &"
+  fi
+done
+
+sleep 15
+
+java -cp `cat classpath.txt` org.apache.s4.core.Main "@$INJECTOR_CONFIG"
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-benchmarks/config/injector.config
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/config/injector.config b/subprojects/s4-benchmarks/config/injector.config
new file mode 100644
index 0000000..365e734
--- /dev/null
+++ b/subprojects/s4-benchmarks/config/injector.config
@@ -0,0 +1,3 @@
+-c=testCluster1
+-appClass=org.apache.s4.benchmark.simpleApp1.Injector
+-p=s4.adapter.output.stream=inputStream,s4.benchmark.keysCount=4,s4.benchmark.warmupIterations=100000,s4.benchmark.testIterations=500000,s4.benchmark.testSleepInterval=0,s4.benchmark.warmupSleepInterval=0,s4.benchmark.injector.parallelism=2

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-benchmarks/config/node.config
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/config/node.config b/subprojects/s4-benchmarks/config/node.config
new file mode 100644
index 0000000..a5b83b2
--- /dev/null
+++ b/subprojects/s4-benchmarks/config/node.config
@@ -0,0 +1,4 @@
+-c=testCluster2
+-appClass=org.apache.s4.benchmark.simpleApp1.SimpleApp
+-p=s4.adapter.output.stream=inputStream
+-zk=127.0.0.1:2181
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-benchmarks/s4-benchmarks.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/s4-benchmarks.gradle b/subprojects/s4-benchmarks/s4-benchmarks.gradle
new file mode 100644
index 0000000..2fc10bc
--- /dev/null
+++ b/subprojects/s4-benchmarks/s4-benchmarks.gradle
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+project.ext["s4AppInstallDir"] = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+project.ext["s4Version"] = '0.5.0-incubating'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+project.ext["archivesBaseName"] = "$project.name"
+project.ext["distRootFolder"] = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+//version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+apply plugin: 'idea'
+apply plugin:'application'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+repositories {
+    mavenLocal()
+    mavenCentral()
+    mavenRepo name: "gson", url: "http://google-gson.googlecode.com/svn/mavenrepo"
+
+    /* Add lib dir as a repo. Some jar files that are not available
+     in a public repo are distributed in the lib dir. */
+}
+
+
+dependencies {
+
+   /* S4 Platform. We only need the API, not the transitive dependencies. */
+
+    compile project( ":s4-base" )
+    compile project( ":s4-core" )
+    compile project( ":s4-comm" )
+    compile (libraries.jcommander)
+    compile (libraries.zkclient)
+    compile (libraries.metrics)
+    
+   
+
+   // if you need to use the twitter4j lib defined above, you must reference it here as a dependency
+   // compile (libraries.twitter4j_core)
+
+
+}
+
+
+
+task cp << {
+    description='Dumps the classpath for running a class from this project, into a \'classpath.txt\' file in the current directory'
+    String rt = ""
+    configurations.runtime.files.each{File file -> rt+=(file.path+File.pathSeparator) }
+    new File("classpath.txt").write(sourceSets.main.output.classesDir.path + File.pathSeparator + rt + File.pathSeparator + '/Users/matthieu/apache-s4-0.5.0-incubating-bin' + '/subprojects/s4-tools/build/install/s4-tools/lib/*')
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
new file mode 100644
index 0000000..614cd59
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/Injector.java
@@ -0,0 +1,235 @@
+package org.apache.s4.benchmark.simpleApp1;
+
+import java.io.File;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.benchmark.utils.Utils;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.RemoteStream;
+import org.apache.s4.core.adapter.AdapterApp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.reporting.ConsoleReporter;
+import com.yammer.metrics.reporting.CsvReporter;
+
+public class Injector extends AdapterApp {
+
+    private static Logger logger = LoggerFactory.getLogger(Injector.class);
+
+    @Inject
+    @Named("s4.benchmark.warmupIterations")
+    long warmupIterations;
+
+    @Inject
+    @Named("s4.benchmark.testIterations")
+    long testIterations;
+
+    @Inject
+    @Named("s4.benchmark.keysCount")
+    int keysCount;
+
+    @Inject
+    @Named("s4.benchmark.warmupSleepInterval")
+    int warmupSleepInterval;
+
+    @Inject
+    @Named("s4.benchmark.testSleepInterval")
+    int testSleepInterval;
+
+    @Inject
+    @Named("s4.cluster.zk_address")
+    String zkString;
+
+    @Inject
+    @Named("s4.benchmark.injector.parallelism")
+    int parallelism;
+
+    // Meter meter = Metrics.newMeter(Injector.class, "injector", "injected", TimeUnit.SECONDS);
+
+    static AtomicLong counter = new AtomicLong();
+    static AtomicLong eventCountPerInterval = new AtomicLong();
+    BigDecimal rate;
+    volatile long lastTime = -1;
+
+    @Override
+    protected void onInit() {
+
+        File logDir = new File(System.getProperty("user.dir") + "/measurements/injectors");
+        if (!logDir.mkdirs()) {
+            throw new RuntimeException("Cannot create dir " + logDir.getAbsolutePath());
+        }
+        CsvReporter.enable(logDir, 5, TimeUnit.SECONDS);
+        remoteStreamKeyFinder = new KeyFinder<Event>() {
+
+            @Override
+            public List<String> get(Event event) {
+                return ImmutableList.of(event.get("key"));
+            }
+        };
+        super.onInit();
+        ConsoleReporter.enable(30, TimeUnit.SECONDS);
+        ZkClient zkClient = new ZkClient(zkString);
+        zkClient.createPersistent("/benchmarkConfig");
+        zkClient.createPersistent("/benchmarkConfig/warmupIterations", warmupIterations * parallelism);
+        zkClient.createPersistent("/benchmarkConfig/testIterations", testIterations * parallelism);
+        zkClient.close();
+    }
+
+    @Override
+    protected void onStart() {
+
+        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
+
+            @Override
+            public void run() {
+                if (lastTime == -1) {
+                    lastTime = System.currentTimeMillis();
+                } else {
+                    if ((System.currentTimeMillis() - lastTime) > 1000) {
+                        rate = new BigDecimal(eventCountPerInterval.getAndSet(0)).divide(
+                                new BigDecimal(System.currentTimeMillis() - lastTime), MathContext.DECIMAL64).multiply(
+                                new BigDecimal(1000), MathContext.DECIMAL64);
+                        lastTime = System.currentTimeMillis();
+                    }
+                }
+
+            }
+        }, 1, 1, TimeUnit.SECONDS);
+
+        Metrics.newGauge(Injector.class, "injection-rate", new Gauge<BigDecimal>() {
+
+            @Override
+            public BigDecimal value() {
+                return rate;
+            }
+        });
+
+        CountDownLatch signalWarmupComplete = Utils.getReadySignal(zkString, "/warmup", keysCount);
+
+        RemoteStream remoteStream = getRemoteStream();
+        generateEvents(remoteStream, warmupIterations, keysCount, warmupSleepInterval, parallelism);
+
+        generateStopEvent(remoteStream, -1, keysCount);
+
+        // now that we are certain app nodes are connected, check the target cluster
+        ZkClient zkClient = new ZkClient(zkString);
+        zkClient.setZkSerializer(new ZNRecordSerializer());
+        ZNRecord readData = zkClient.readData("/s4/streams/" + getRemoteStream().getName() + "/consumers/"
+                + zkClient.getChildren("/s4/streams/" + getRemoteStream().getName() + "/consumers").get(0));
+        String remoteClusterName = readData.getSimpleField("clusterName");
+
+        int appPartitionCount = zkClient.countChildren("/s4/clusters/" + remoteClusterName + "/tasks");
+        zkClient.close();
+        CountDownLatch signalBenchComplete = Utils.getReadySignal(zkString, "/test", appPartitionCount);
+
+        try {
+            signalWarmupComplete.await();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        logger.info("Warmup over with {} iterations over {} keys", warmupIterations, keysCount);
+        counter.set(0);
+
+        generateEvents(remoteStream, testIterations, keysCount, testSleepInterval, parallelism);
+
+        generateStopEvent(remoteStream, -2, appPartitionCount);
+        try {
+            // only need 1 message/partition. Upon reception, a znode is written and the node exits
+            signalBenchComplete.await();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        logger.info("Tests completed after {} warmup and {} test events", warmupIterations * parallelism * keysCount,
+                testIterations * parallelism * keysCount);
+
+        System.exit(0);
+    }
+
+    private void generateEvents(RemoteStream remoteStream, long iterations, int keysCount, int sleepInterval,
+            int parallelism) {
+
+        ExecutorService threadPool = Executors.newFixedThreadPool(parallelism);
+        for (int i = 0; i < parallelism; i++) {
+            threadPool.submit(new InjectionTask(iterations, remoteStream, sleepInterval));
+        }
+
+        threadPool.shutdown();
+        try {
+            threadPool.awaitTermination(10, TimeUnit.MINUTES);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void generateStopEvent(RemoteStream remoteStream, long stopKey, int keysCount) {
+
+        ExecutorService threadPool = Executors.newFixedThreadPool(1);
+        for (int j = 0; j < keysCount; j++) {
+            Event event = new Event();
+            event.put("key", Integer.class, j);
+            event.put("value", Long.class, stopKey);
+            logger.info("Sending stop event with key {}", stopKey);
+            remoteStream.put(event);
+        }
+        threadPool.shutdown();
+        try {
+            threadPool.awaitTermination(1, TimeUnit.MINUTES);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    class InjectionTask implements Runnable {
+
+        private long iterations;
+        private RemoteStream remoteStream;
+        private long sleepInterval;
+
+        public InjectionTask(long iterations, RemoteStream remoteStream, long sleepInterval) {
+            super();
+            this.iterations = iterations;
+            this.remoteStream = remoteStream;
+            this.sleepInterval = sleepInterval;
+
+        }
+
+        @Override
+        public void run() {
+            for (long i = 0; i < iterations; i++) {
+                for (int j = 0; j < keysCount; j++) {
+                    Event event = new Event();
+                    event.put("key", Integer.class, j);
+                    event.put("value", Long.class, counter.incrementAndGet());
+                    // logger.info("{}/{}/{}/",
+                    // new String[] { Thread.currentThread().getName(), String.valueOf(i), String.valueOf(j),
+                    // String.valueOf(event.get("value")) });
+                    remoteStream.put(event);
+                    eventCountPerInterval.incrementAndGet();
+                    try {
+                        Thread.sleep(sleepInterval);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
new file mode 100644
index 0000000..6385bc9
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
@@ -0,0 +1,69 @@
+package org.apache.s4.benchmark.simpleApp1;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.App;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import com.yammer.metrics.reporting.ConsoleReporter;
+import com.yammer.metrics.reporting.CsvReporter;
+
+public class SimpleApp extends App {
+
+    @Inject
+    @Named("s4.cluster.zk_address")
+    String zkString;
+
+    @Override
+    protected void onStart() {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    protected void onInit() {
+        File logDirectory = new File(System.getProperty("user.dir") + "/measurements/node"
+                + getReceiver().getPartition());
+        if (!logDirectory.exists()) {
+            if (!logDirectory.mkdirs()) {
+                throw new RuntimeException("Cannot create log dir " + logDirectory.getAbsolutePath());
+            }
+        }
+        CsvReporter.enable(logDirectory, 5, TimeUnit.SECONDS);
+        ConsoleReporter.enable(30, TimeUnit.SECONDS);
+
+        SimplePE1 simplePE1 = createPE(SimplePE1.class, "simplePE1");
+        ZkClient zkClient = new ZkClient(zkString);
+        zkClient.waitUntilExists("/benchmarkConfig/warmupIterations", TimeUnit.SECONDS, 60);
+        Long warmupIterations = zkClient.readData("/benchmarkConfig/warmupIterations");
+        Long testIterations = zkClient.readData("/benchmarkConfig/testIterations");
+
+        simplePE1.setWarmupIterations(warmupIterations);
+        simplePE1.setTestIterations(testIterations);
+        createInputStream("inputStream", new KeyFinder<Event>() {
+
+            @Override
+            public List<String> get(Event event) {
+                return ImmutableList.of(event.get("key"));
+            }
+        }, simplePE1);
+
+    }
+
+    @Override
+    protected void onClose() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public String getZkString() {
+        return zkString;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
new file mode 100644
index 0000000..1fd40fb
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimplePE1.java
@@ -0,0 +1,91 @@
+package org.apache.s4.benchmark.simpleApp1;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+
+public class SimplePE1 extends ProcessingElement {
+
+    private static Logger logger = LoggerFactory.getLogger(SimplePE1.class);
+
+    private long warmupIterations = -1;
+    boolean warmedUp = false;
+    private long testIterations = -1;
+    AtomicLong counter = new AtomicLong();
+    BigDecimal rate;
+    long lastTime = -1;
+
+    public void setWarmupIterations(long warmupIterations) {
+        this.warmupIterations = warmupIterations;
+    }
+
+    public void setTestIterations(long testIterations) {
+        this.testIterations = testIterations;
+    }
+
+    public void onEvent(Event event) {
+        counter.incrementAndGet();
+
+        if (lastTime == -1) {
+            lastTime = System.currentTimeMillis();
+        } else {
+            if ((System.currentTimeMillis() - lastTime) > 1000) {
+                rate = new BigDecimal(counter.get()).divide(new BigDecimal(System.currentTimeMillis() - lastTime),
+                        MathContext.DECIMAL64).multiply(new BigDecimal(1000));
+
+                counter.set(0);
+                lastTime = System.currentTimeMillis();
+            }
+        }
+
+        Long value = event.get("value", Long.class);
+        // logger.info("reached value {}", value);
+        if (!warmedUp && (value == -1)) {
+            logger.info("**** Warmed up");
+            addSequentialNode("/warmup");
+            warmedUp = true;
+
+        } else if (value == (-2)) {
+            logger.info("******* finished **************");
+
+            addSequentialNode("/test");
+            System.exit(0);
+            logger.info("SADFASFDASFDASFDASFDASDFASDFASDF**************");
+
+        }
+
+    }
+
+    private void addSequentialNode(String parent) {
+        ZkClient zkClient = new ZkClient(((SimpleApp) getApp()).getZkString());
+        zkClient.createPersistentSequential(parent + "/done", new byte[0]);
+        zkClient.close();
+    }
+
+    @Override
+    protected void onCreate() {
+        Metrics.newGauge(SimplePE1.class, "simplePE1", new Gauge<BigDecimal>() {
+
+            @Override
+            public BigDecimal value() {
+                return rate;
+            }
+        });
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java
new file mode 100644
index 0000000..2f28feb
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java
@@ -0,0 +1,39 @@
+package org.apache.s4.benchmark.utils;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.ZkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utils {
+
+    private static Logger logger = LoggerFactory.getLogger(Utils.class);
+
+    public static CountDownLatch getReadySignal(String zkString, final String parentPath, final int counts) {
+        ZkClient zkClient = new ZkClient(zkString);
+        if (zkClient.exists(parentPath)) {
+            System.out.println(parentPath + " path exists and will be deleted");
+            zkClient.deleteRecursive(parentPath);
+        }
+        zkClient.createPersistent(parentPath);
+        final CountDownLatch signalReady = new CountDownLatch(1);
+        zkClient.subscribeChildChanges(parentPath, new IZkChildListener() {
+
+            @Override
+            public void handleChildChange(String arg0, List<String> arg1) throws Exception {
+
+                if (parentPath.equals(arg0)) {
+                    if (arg1.size() == counts) {
+                        logger.info("Latch reached for {} with {} children", arg0, counts);
+                        signalReady.countDown();
+                    }
+                }
+            }
+        });
+        return signalReady;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-benchmarks/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/src/main/resources/logback.xml b/subprojects/s4-benchmarks/src/main/resources/logback.xml
new file mode 100644
index 0000000..ea8c85a
--- /dev/null
+++ b/subprojects/s4-benchmarks/src/main/resources/logback.xml
@@ -0,0 +1,14 @@
+<configuration>
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <!-- encoders are assigned the type
+         ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="debug">
+    <appender-ref ref="STDOUT" />
+  </root>
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-benchmarks/startNode.sh
----------------------------------------------------------------------
diff --git a/subprojects/s4-benchmarks/startNode.sh b/subprojects/s4-benchmarks/startNode.sh
new file mode 100755
index 0000000..53ba4b1
--- /dev/null
+++ b/subprojects/s4-benchmarks/startNode.sh
@@ -0,0 +1,21 @@
+#!/bin/bash -x
+BENCH_ROOTDIR=$1
+NODE_CONFIG=$2
+host=$3
+
+if [ "$host" == "localhost" ] || [ "$host" == "127.0.0.1" ] ; then
+  echo "start on localhost"
+else
+  killall -9 java
+fi
+
+cd $BENCH_ROOTDIR
+
+
+# you may add profiling to the application nodes using the correct options for your system
+#PROFILING_OPTS="-agentpath:/Applications/YourKit_Java_Profiler_11.0.8.app/bin/mac/libyjpagent.jnilib=delay=10000,onexit=snapshot,onexit=memory,sampling,monitors"
+PROFILING_OPTS=""
+
+java $PROFILING_OPTS -server -cp `cat classpath.txt` org.apache.s4.core.Main "@$NODE_CONFIG" &
+
+# java -cp `cat classpath.txt` org.apache.s4.core.Main "@`pwd`/src/main/resources/injector.config"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
index b7fad75..f622de6 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
@@ -28,8 +28,8 @@ import com.esotericsoftware.kryo.serialize.ClassSerializer;
 import com.esotericsoftware.kryo.serialize.SimpleSerializer;
 
 /**
- * Serializazer/deserializer based on <a href="http://code.google.com/p/kryo/">kryo</a>
- *
+ * Serializer/deserializer based on <a href="http://code.google.com/p/kryo/">kryo</a>
+ * 
  */
 public class KryoSerDeser implements SerializerDeserializer {
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
index 85dc86d..63aa1f0 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
@@ -18,8 +18,8 @@
 
 package org.apache.s4.comm.tcp;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.s4.base.RemoteEmitter;
 import org.apache.s4.comm.RemoteEmitterFactory;
@@ -35,7 +35,7 @@ import com.google.inject.Singleton;
 @Singleton
 public class RemoteEmitters {
 
-    Map<Cluster, RemoteEmitter> emitters = new HashMap<Cluster, RemoteEmitter>();
+    ConcurrentMap<Cluster, RemoteEmitter> emitters = new ConcurrentHashMap<Cluster, RemoteEmitter>();
 
     @Inject
     RemoteEmitterFactory emitterFactory;
@@ -43,10 +43,12 @@ public class RemoteEmitters {
     public RemoteEmitter getEmitter(Cluster topology) {
         RemoteEmitter emitter = emitters.get(topology);
         if (emitter == null) {
-            emitter = emitterFactory.createRemoteEmitter(topology);
-            emitters.put(topology, emitter);
+            RemoteEmitter newEmitter = emitterFactory.createRemoteEmitter(topology);
+            emitter = emitters.putIfAbsent(topology, newEmitter);
+            if (emitter == null) {
+                emitter = newEmitter;
+            }
         }
         return emitter;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 44c9247..a6cc8b5 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -31,6 +31,8 @@ import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterChangeListener;
 import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.comm.util.CommMetrics;
+import org.apache.s4.comm.util.CommMetrics.EmitterMetrics;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -91,6 +93,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     @Inject
     SerializerDeserializer serDeser;
 
+    CommMetrics.EmitterMetrics metrics;
+
     @Inject
     public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout) throws InterruptedException {
         this.nettyTimeout = timeout;
@@ -120,12 +124,14 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         bootstrap.setOption("keepAlive", true);
         bootstrap.setOption("reuseAddress", true);
         bootstrap.setOption("connectTimeoutMillis", this.nettyTimeout);
+
     }
 
     @Inject
     private void init() {
         refreshCluster();
         this.topology.addListener(this);
+        metrics = new EmitterMetrics(topology);
     }
 
     private boolean connectTo(Integer partitionId) {
@@ -161,14 +167,17 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
         if (!partitionChannelMap.containsKey(partitionId)) {
             if (!connectTo(partitionId)) {
+                logger.warn("Could not connect to partition {}, discarding message", partitionId);
                 // Couldn't connect, discard message
                 return;
             }
         }
 
         Channel c = partitionChannelMap.get(partitionId);
-        if (c == null)
+        if (c == null) {
+            logger.warn("Could not find channel for partition {}", partitionId);
             return;
+        }
 
         c.write(buffer).addListener(new MessageSendingListener(partitionId));
     }
@@ -269,8 +278,12 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
                     logger.warn("Failed to send message to node {} (according to current cluster information)",
                             topology.getPhysicalCluster().getNodes().get(partitionId));
                 } catch (IndexOutOfBoundsException ignored) {
+                    logger.error("Failed to send message to partition {}", partitionId);
                     // cluster was changed
                 }
+            } else {
+                metrics.sentMessage(partitionId);
+
             }
 
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/CommMetrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/CommMetrics.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/CommMetrics.java
new file mode 100644
index 0000000..37ff121
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/CommMetrics.java
@@ -0,0 +1,30 @@
+package org.apache.s4.comm.util;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.topology.Cluster;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Meter;
+
+public class CommMetrics {
+
+    public static class EmitterMetrics {
+        private Meter[] emittersMeters;
+
+        public EmitterMetrics(Cluster cluster) {
+            emittersMeters = new Meter[cluster.getPhysicalCluster().getPartitionCount()];
+            for (int i = 0; i < cluster.getPhysicalCluster().getPartitionCount(); i++) {
+                emittersMeters[i] = Metrics
+                        .newMeter(TCPEmitter.class, "event-emitted@" + cluster.getPhysicalCluster().getName()
+                                + "@partition-" + i, "event-emitted", TimeUnit.SECONDS);
+            }
+        }
+
+        public void sentMessage(int partitionId) {
+            emittersMeters[partitionId].mark();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 936d225..cf64080 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -41,7 +41,7 @@ import com.google.inject.name.Named;
 
 /**
  * Container base class to hold all processing elements.
- *
+ * 
  * It is also where one defines the application graph: PE prototypes, internal streams, input and output streams.
  */
 public abstract class App {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index fc85219..b9ccef5 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -62,6 +62,15 @@ public class Main {
      */
     public static void main(String[] args) {
 
+        Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                logger.error("Uncaught exception in thread {}", t.getName(), e);
+
+            }
+        });
+
         MainArgs mainArgs = new MainArgs();
         JCommander jc = new JCommander(mainArgs);
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index b899403..5429076 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -39,6 +39,7 @@ import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
 import org.apache.s4.core.ft.CheckpointingTask;
 import org.apache.s4.core.gen.OverloadDispatcher;
 import org.apache.s4.core.gen.OverloadDispatcherGenerator;
+import org.apache.s4.core.util.S4Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +52,9 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.core.TimerContext;
 
 /**
  * <p>
@@ -141,6 +145,8 @@ public abstract class ProcessingElement implements Cloneable {
     transient private boolean recoveryAttempted = false;
     transient private boolean dirty = false;
 
+    transient private Timer processingTimer;
+
     transient private CheckpointingConfig checkpointingConfig = new CheckpointingConfig.Builder(CheckpointingMode.NONE)
             .build();
 
@@ -158,7 +164,6 @@ public abstract class ProcessingElement implements Cloneable {
                 return createPE(key);
             }
         });
-
         triggers = new MapMaker().makeMap();
 
         /*
@@ -166,6 +171,10 @@ public abstract class ProcessingElement implements Cloneable {
          * to the prototype.
          */
         this.pePrototype = this;
+
+        S4Metrics.createCacheGauges(peInstances);
+
+        processingTimer = Metrics.newTimer(getClass(), getClass().getName() + "-pe-processing-time");
     }
 
     /**
@@ -431,6 +440,7 @@ public abstract class ProcessingElement implements Cloneable {
 
     protected void handleInputEvent(Event event) {
 
+        TimerContext timerContext = processingTimer.time();
         Object object;
         if (isThreadSafe) {
             object = new Object(); // a dummy object TODO improve this.
@@ -459,6 +469,7 @@ public abstract class ProcessingElement implements Cloneable {
                 checkpoint();
             }
         }
+        timerContext.stop();
     }
 
     protected boolean isCheckpointable() {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 6c0b19c..23c8f2d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -24,6 +24,7 @@ import org.apache.s4.base.Event;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.Listener;
 import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.core.util.S4Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +68,7 @@ public class Receiver implements Runnable {
         streams = new MapMaker().makeMap();
     }
 
-    int getPartition() {
+    public int getPartition() {
         return listener.getPartitionId();
     }
 
@@ -98,6 +99,7 @@ public class Receiver implements Runnable {
         // here?
         while (!Thread.interrupted()) {
             byte[] message = listener.recv();
+            S4Metrics.receivedEvent(message.length);
             EventMessage event = (EventMessage) serDeser.deserialize(message);
 
             int appId = Integer.valueOf(event.getAppName());

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index daccbaa..2d772bf 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -18,33 +18,44 @@
 
 package org.apache.s4.core;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.Hasher;
+import org.apache.s4.core.util.S4Metrics;
 
 /**
  * Sends events to a remote cluster.
- *
+ * 
  */
 public class RemoteSender {
 
     final private Emitter emitter;
     final private Hasher hasher;
-    int targetPartition = 0;
+    AtomicInteger targetPartition = new AtomicInteger();
+    final private String remoteClusterName;
 
-    public RemoteSender(Emitter emitter, Hasher hasher) {
+    public RemoteSender(Emitter emitter, Hasher hasher, String clusterName) {
         super();
         this.emitter = emitter;
         this.hasher = hasher;
+        this.remoteClusterName = clusterName;
+
+        S4Metrics.createRemoteStreamMeters(clusterName, emitter.getPartitionCount());
+
     }
 
     public void send(String hashKey, EventMessage eventMessage) {
+        int partition;
         if (hashKey == null) {
             // round robin by default
-            emitter.send(Math.abs(targetPartition++ % emitter.getPartitionCount()), eventMessage);
+            partition = Math.abs(targetPartition.incrementAndGet() % emitter.getPartitionCount());
         } else {
-            int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
-            emitter.send(partition, eventMessage);
+            partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
         }
+        emitter.send(partition, eventMessage);
+        S4Metrics.sentEventToRemoteCluster(remoteClusterName, partition);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index 3a111d0..282e747 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -18,9 +18,9 @@
 
 package org.apache.s4.core;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.s4.base.Event;
 import org.apache.s4.base.EventMessage;
@@ -38,7 +38,7 @@ import com.google.inject.Inject;
 /**
  * Sends events to remote clusters. Target clusters are selected dynamically based on the stream name information from
  * the event.
- *
+ * 
  */
 public class RemoteSenders {
 
@@ -59,25 +59,29 @@ public class RemoteSenders {
     @Inject
     Hasher hasher;
 
-    Map<String, RemoteSender> sendersByTopology = new HashMap<String, RemoteSender>();
+    ConcurrentMap<String, RemoteSender> sendersByTopology = new ConcurrentHashMap<String, RemoteSender>();
 
     public void send(String hashKey, Event event) {
 
         Set<StreamConsumer> consumers = streams.getConsumers(event.getStreamName());
+        event.setAppId(-1);
+        EventMessage eventMessage = new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(),
+                serDeser.serialize(event));
         for (StreamConsumer consumer : consumers) {
             // NOTE: even though there might be several ephemeral znodes for the same app and topology, they are
             // represented by a single stream consumer
             RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
             if (sender == null) {
-                sender = new RemoteSender(emitters.getEmitter(topologies.getCluster(consumer.getClusterName())), hasher);
+                RemoteSender newSender = new RemoteSender(emitters.getEmitter(topologies.getCluster(consumer
+                        .getClusterName())), hasher, consumer.getClusterName());
                 // TODO cleanup when remote topologies die
-                sendersByTopology.put(consumer.getClusterName(), sender);
+                sender = sendersByTopology.putIfAbsent(consumer.getClusterName(), newSender);
+                if (sender == null) {
+                    sender = newSender;
+                }
             }
             // we must set the app id of the consumer app for correct dispatch within the consumer node
             // NOTE: this implies multiple serializations, there might be an optimization
-            event.setAppId(consumer.getAppId());
-            EventMessage eventMessage = new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(),
-                    serDeser.serialize(event));
             sender.send(hashKey, eventMessage);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
index c4a3798..a097c1d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
@@ -18,8 +18,6 @@
 
 package org.apache.s4.core;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.s4.base.Event;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.Key;
@@ -37,18 +35,15 @@ public class RemoteStream implements Streamable<Event> {
     final private String name;
     final protected Key<Event> key;
     final static private String DEFAULT_SEPARATOR = "^";
-    // final private int id;
 
     RemoteSenders remoteSenders;
 
     Hasher hasher;
 
     int id;
-    private App app;
+    final private App app;
     private static Logger logger = LoggerFactory.getLogger(RemoteStream.class);
 
-    private static AtomicInteger remoteStreamCounter = new AtomicInteger();
-
     public RemoteStream(App app, String name, KeyFinder<Event> finder, RemoteSenders remoteSenders, Hasher hasher,
             RemoteStreams remoteStreams, String clusterName) {
         this.app = app;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 5b0b03d..32cdde1 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -25,6 +25,7 @@ import org.apache.s4.base.Hasher;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.core.util.S4Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,17 +88,18 @@ public class Sender {
      */
     public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
         int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
-
         if (partition == localPartitionId) {
             /* Hey we are in the same JVM, don't use the network. */
             return false;
         }
         send(partition,
                 new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), serDeser.serialize(event)));
+        S4Metrics.sentEvent(partition);
         return true;
     }
 
     private void send(int partition, EventMessage event) {
+
         emitter.send(partition, event);
     }
 
@@ -113,11 +115,14 @@ public class Sender {
         for (int i = 0; i < emitter.getPartitionCount(); i++) {
 
             /* Don't use the comm layer when we send to the same partition. */
-            if (localPartitionId != i)
+            if (localPartitionId != i) {
                 emitter.send(
                         i,
                         new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), serDeser
                                 .serialize(event)));
+                S4Metrics.sentEvent(i);
+
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index aa54dfc..b837d6d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -27,6 +27,7 @@ import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.GenericKeyFinder;
 import org.apache.s4.base.Key;
 import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.util.S4Metrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +73,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
     }
 
     public void start() {
-
+        S4Metrics.createStreamMeters(name);
         if (logger.isTraceEnabled()) {
             if (targetPEs != null) {
                 for (ProcessingElement pe : targetPEs) {
@@ -202,8 +203,12 @@ public class Stream<T extends Event> implements Runnable, Streamable {
                  * the queue.
                  */
                 sender.sendToRemotePartitions(event);
+
                 queue.put(new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), app.getSerDeser()
                         .serialize(event)));
+                // TODO abstraction around queue and add dropped counter
+                // TODO add counter for local events
+
             }
         } catch (InterruptedException e) {
             logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
@@ -217,6 +222,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
     public void receiveEvent(EventMessage event) {
         try {
             queue.put(event);
+            // TODO abstraction around queue and add dropped counter
         } catch (InterruptedException e) {
             logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
             Thread.currentThread().interrupt();
@@ -278,6 +284,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
             try {
                 /* Get oldest event in queue. */
                 EventMessage eventMessage = queue.take();
+                S4Metrics.dequeuedEvent(name);
 
                 @SuppressWarnings("unchecked")
                 T event = (T) app.getSerDeser().deserialize(eventMessage.getSerializedEvent());

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
index 3ffc91b..5052410 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.util.S4Metrics.CheckpointingMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -178,10 +179,7 @@ public final class SafeKeeper implements CheckpointingFramework {
         try {
             futureSerializedState = serializeState(pe);
         } catch (RejectedExecutionException e) {
-            // if (monitor != null) {
-            // monitor.increment(MetricsName.checkpointing_dropped_from_serialization_queue.toString(), 1,
-            // S4_CORE_METRICS.toString());
-            // }
+            CheckpointingMetrics.rejectedSerializationTask();
             storageCallback.storageOperationResult(StorageResultCode.FAILURE,
                     "Serialization task queue is full. An older serialization task was dumped in order to serialize PE ["
                             + pe.getId() + "]" + "	Remaining capacity for the serialization task queue is ["
@@ -197,23 +195,14 @@ public final class SafeKeeper implements CheckpointingFramework {
 
     private Future<byte[]> serializeState(ProcessingElement pe) {
         Future<byte[]> future = serializationThreadPool.submit(new SerializeTask(pe));
-        // if (monitor != null) {
-        // monitor.increment(MetricsName.checkpointing_added_to_serialization_queue.toString(), 1,
-        // S4_CORE_METRICS.toString());
-        // }
         return future;
     }
 
     private void submitSaveStateTask(SaveStateTask task, StorageCallback storageCallback) {
         try {
             storageThreadPool.execute(task);
-            // if (monitor != null) {
-            // monitor.increment(MetricsName.checkpointing_added_to_storage_queue.toString(), 1);
-            // }
         } catch (RejectedExecutionException e) {
-            // if (monitor != null) {
-            // monitor.increment(MetricsName.checkpointing_dropped_from_storage_queue.toString(), 1);
-            // }
+            CheckpointingMetrics.rejectedStorageTask();
             storageCallback.storageOperationResult(StorageResultCode.FAILURE,
                     "Storage checkpoint queue is full. Removed an old task to handle latest task. Remaining capacity for task queue is ["
                             + storageThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
@@ -243,6 +232,7 @@ public final class SafeKeeper implements CheckpointingFramework {
         Future<byte[]> fetched = fetchingThreadPool.submit(new FetchTask(stateStorage, key));
         try {
             result = fetched.get(fetchingMaxWaitMs, TimeUnit.MILLISECONDS);
+            CheckpointingMetrics.fetchedCheckpoint();
             fetchingCurrentConsecutiveFailures.set(0);
             return result;
         } catch (TimeoutException te) {
@@ -257,6 +247,7 @@ public final class SafeKeeper implements CheckpointingFramework {
             logger.error("Cannot fetch checkpoint from backend for key [{}] due to {}", key.getStringRepresentation(),
                     e.getCause().getClass().getName() + "/" + e.getCause().getMessage());
         }
+        CheckpointingMetrics.checkpointFetchFailed();
         if (fetchingCurrentConsecutiveFailures.incrementAndGet() == fetchingMaxConsecutiveFailuresBeforeDisabling) {
             logger.trace(
                     "Due to {} successive checkpoint fetching failures, fetching is temporarily disabled for {} ms",

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/80dfe403/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
new file mode 100644
index 0000000..d0ffc0d
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -0,0 +1,143 @@
+package org.apache.s4.core.util;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Receiver;
+import org.apache.s4.core.RemoteSender;
+import org.apache.s4.core.Sender;
+import org.apache.s4.core.Stream;
+
+import com.beust.jcommander.internal.Lists;
+import com.beust.jcommander.internal.Maps;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+
+public class S4Metrics {
+
+    @Inject
+    Emitter emitter;
+
+    @Inject
+    Assignment assignment;
+
+    static List<Meter> partitionSenderMeters = Lists.newArrayList();
+
+    private static Meter eventMeter = Metrics.newMeter(Receiver.class, "received-events", "event-count",
+            TimeUnit.SECONDS);
+    private static Meter bytesMeter = Metrics.newMeter(Receiver.class, "received-bytes", "bytes-count",
+            TimeUnit.SECONDS);
+
+    private static Meter[] senderMeters;
+
+    private static Map<String, Meter> dequeuingStreamMeters = Maps.newHashMap();
+    private static Map<String, Meter> droppedStreamMeters = Maps.newHashMap();
+
+    private static Map<String, Meter[]> remoteSenderMeters = Maps.newHashMap();
+
+    @Inject
+    private void init() {
+        senderMeters = new Meter[emitter.getPartitionCount()];
+        int localPartitionId = assignment.assignClusterNode().getPartition();
+        for (int i = 0; i < senderMeters.length; i++) {
+            senderMeters[i] = Metrics.newMeter(Sender.class, "sender", "sent-to-"
+                    + ((i == localPartitionId) ? i + "(local)" : i), TimeUnit.SECONDS);
+        }
+    }
+
+    public static void createCacheGauges(final LoadingCache<String, ProcessingElement> cache) {
+        Metrics.newGauge(ProcessingElement.class, "PE-cache-entries", new Gauge<Long>() {
+
+            @Override
+            public Long value() {
+                return cache.size();
+            }
+        });
+        Metrics.newGauge(ProcessingElement.class, "PE-cache-evictions", new Gauge<Long>() {
+
+            @Override
+            public Long value() {
+                return cache.stats().evictionCount();
+            }
+        });
+        Metrics.newGauge(ProcessingElement.class, "PE-cache-misses", new Gauge<Long>() {
+
+            @Override
+            public Long value() {
+                return cache.stats().missCount();
+            }
+        });
+    }
+
+    public static void receivedEvent(int bytes) {
+        eventMeter.mark();
+        bytesMeter.mark(bytes);
+    }
+
+    public static void sentEvent(int partition) {
+        senderMeters[partition].mark();
+    }
+
+    public static void createStreamMeters(String name) {
+        // TODO avoid maps to avoid map lookups?
+        dequeuingStreamMeters.put(name,
+                Metrics.newMeter(Stream.class, "dequeued@" + name, "dequeued", TimeUnit.SECONDS));
+        droppedStreamMeters.put(name, Metrics.newMeter(Stream.class, "dropped@" + name, "dropped", TimeUnit.SECONDS));
+
+    }
+
+    public static void dequeuedEvent(String name) {
+        dequeuingStreamMeters.get(name).mark();
+    }
+
+    public static void createRemoteStreamMeters(String remoteClusterName, int partitionCount) {
+        Meter[] meters = new Meter[partitionCount];
+        for (int i = 0; i < partitionCount; i++) {
+            meters[i] = Metrics.newMeter(RemoteSender.class, "remote-sender@" + remoteClusterName + "@partition-" + i,
+                    "sent", TimeUnit.SECONDS);
+        }
+        synchronized (remoteSenderMeters) {
+            remoteSenderMeters.put(remoteClusterName, meters);
+        }
+
+    }
+
+    public static void sentEventToRemoteCluster(String remoteClusterName, int partition) {
+        remoteSenderMeters.get(remoteClusterName)[partition].mark();
+    }
+
+    public static class CheckpointingMetrics {
+
+        static Meter rejectedSerializationTask = Metrics.newMeter(CheckpointingMetrics.class, "checkpointing",
+                "rejected-serialization-task", TimeUnit.SECONDS);
+        static Meter rejectedStorageTask = Metrics.newMeter(CheckpointingMetrics.class, "checkpointing",
+                "rejected-storage-task", TimeUnit.SECONDS);
+        static Meter fetchedCheckpoint = Metrics.newMeter(CheckpointingMetrics.class, "checkpointing",
+                "fetched-checkpoint", TimeUnit.SECONDS);
+        static Meter fetchedCheckpointFailure = Metrics.newMeter(CheckpointingMetrics.class, "checkpointing",
+                "fetched-checkpoint-failed", TimeUnit.SECONDS);
+
+        public static void rejectedSerializationTask() {
+            rejectedSerializationTask.mark();
+        }
+
+        public static void rejectedStorageTask() {
+            rejectedStorageTask.mark();
+        }
+
+        public static void fetchedCheckpoint() {
+            fetchedCheckpoint.mark();
+        }
+
+        public static void checkpointFetchFailed() {
+            fetchedCheckpointFailure.mark();
+        }
+    }
+}