You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/20 16:51:48 UTC

[1/5] storm git commit: STORM-1118: Added test to compair latency vs throughput in storm.

Repository: storm
Updated Branches:
  refs/heads/master 647d672b5 -> 4b4bbd08b


STORM-1118: Added test to compair latency vs throughput in storm.

Also added a metrics gathering server for testing, an HdrHistogram based metric, and CPU utilization metric using sigar.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c356c35a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c356c35a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c356c35a

Branch: refs/heads/master
Commit: c356c35aeab17bc14861bf7efa8ddedd3f257741
Parents: 1d393ee
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Oct 16 14:57:23 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Oct 17 13:59:23 2015 -0500

----------------------------------------------------------------------
 LICENSE                                         |   9 +
 examples/storm-starter/pom.xml                  |   9 +
 .../jvm/storm/starter/ThroughputVsLatency.java  | 376 +++++++++++++++++++
 external/storm-metrics/pom.xml                  |  58 +++
 .../metrics/hdrhistogram/HistogramMetric.java   |  79 ++++
 .../apache/storm/metrics/sigar/CPUMetric.java   |  60 +++
 .../resources/libsigar-amd64-freebsd-6.so       | Bin 0 -> 210641 bytes
 .../resources/resources/libsigar-amd64-linux.so | Bin 0 -> 246605 bytes
 .../resources/libsigar-amd64-solaris.so         | Bin 0 -> 251360 bytes
 .../resources/libsigar-ia64-hpux-11.sl          | Bin 0 -> 577452 bytes
 .../resources/resources/libsigar-ia64-linux.so  | Bin 0 -> 494929 bytes
 .../resources/resources/libsigar-pa-hpux-11.sl  | Bin 0 -> 516096 bytes
 .../resources/resources/libsigar-ppc-aix-5.so   | Bin 0 -> 400925 bytes
 .../resources/resources/libsigar-ppc-linux.so   | Bin 0 -> 258547 bytes
 .../resources/resources/libsigar-ppc64-aix-5.so | Bin 0 -> 425077 bytes
 .../resources/resources/libsigar-ppc64-linux.so | Bin 0 -> 330767 bytes
 .../resources/resources/libsigar-s390x-linux.so | Bin 0 -> 269932 bytes
 .../resources/libsigar-sparc-solaris.so         | Bin 0 -> 285004 bytes
 .../resources/libsigar-sparc64-solaris.so       | Bin 0 -> 261896 bytes
 .../resources/libsigar-universal-macosx.dylib   | Bin 0 -> 377668 bytes
 .../resources/libsigar-universal64-macosx.dylib | Bin 0 -> 397440 bytes
 .../resources/libsigar-x86-freebsd-5.so         | Bin 0 -> 179751 bytes
 .../resources/libsigar-x86-freebsd-6.so         | Bin 0 -> 179379 bytes
 .../resources/resources/libsigar-x86-linux.so   | Bin 0 -> 233385 bytes
 .../resources/resources/libsigar-x86-solaris.so | Bin 0 -> 242880 bytes
 .../resources/resources/sigar-amd64-winnt.dll   | Bin 0 -> 402432 bytes
 .../resources/resources/sigar-x86-winnt.dll     | Bin 0 -> 266240 bytes
 .../resources/resources/sigar-x86-winnt.lib     | Bin 0 -> 99584 bytes
 pom.xml                                         |   7 +
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   2 +-
 .../metric/HttpForwardingMetricsConsumer.java   |  81 ++++
 .../metric/HttpForwardingMetricsServer.java     | 119 ++++++
 32 files changed, 799 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 8755d1b..5eedc94 100644
--- a/LICENSE
+++ b/LICENSE
@@ -202,6 +202,15 @@
    limitations under the License.
 
 -----------------------------------------------------------------------
+For Hyperic Sigar (external/storm-metrics/src/main/resources/resources)
+
+https://github.com/hyperic/sigar
+
+                              Apache License
+                        Version 2.0, January 2004
+                     http://www.apache.org/licenses/
+
+-----------------------------------------------------------------------
 
 For jQuery 1.11.1 (storm-core/src/ui/public/js/jquery-1.11.1.min.js)
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 0f82c33..ac6472e 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -33,6 +33,10 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.hdrhistogram</groupId>
+      <artifactId>HdrHistogram</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -105,6 +109,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.storm</groupId>
+      <artifactId>storm-metrics</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
       <artifactId>storm-kafka</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
new file mode 100644
index 0000000..0ef1256
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.metric.HttpForwardingMetricsServer;
+import backtype.storm.metric.HttpForwardingMetricsConsumer;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IMetricsConsumer.TaskInfo;
+import backtype.storm.metric.api.IMetricsConsumer.DataPoint;
+import backtype.storm.generated.*;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import backtype.storm.StormSubmitter;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
+import org.HdrHistogram.Histogram;
+
+/**
+ * WordCount but the spout goes at a predefined rate and we collect
+ * proper latency statistics.
+ */
+public class ThroughputVsLatency {
+  private static class SentWithTime {
+    public final String sentence;
+    public final long time;
+
+    SentWithTime(String sentence, long time) {
+        this.sentence = sentence;
+        this.time = time;
+    }
+  }
+
+  public static class C {
+    LocalCluster _local = null;
+    Nimbus.Client _client = null;
+
+    public C(Map conf) {
+      Map clusterConf = Utils.readStormConfig();
+      if (conf != null) {
+        clusterConf.putAll(conf);
+      }
+      Boolean isLocal = (Boolean)clusterConf.get("run.local");
+      if (isLocal != null && isLocal) {
+        _local = new LocalCluster();
+      } else {
+        _client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+      }
+    }
+
+    public ClusterSummary getClusterInfo() throws Exception {
+      if (_local != null) {
+        return _local.getClusterInfo();
+      } else {
+        return _client.getClusterInfo();
+      }
+    }
+
+    public TopologyInfo getTopologyInfo(String id) throws Exception {
+      if (_local != null) {
+        return _local.getTopologyInfo(id);
+      } else {
+        return _client.getTopologyInfo(id);
+      }
+    }
+
+    public void killTopologyWithOpts(String name, KillOptions opts) throws Exception {
+      if (_local != null) {
+        _local.killTopologyWithOpts(name, opts);
+      } else {
+        _client.killTopologyWithOpts(name, opts);
+      }
+    }
+
+    public void submitTopology(String name, Map stormConf, StormTopology topology) throws Exception {
+      if (_local != null) {
+        _local.submitTopology(name, stormConf, topology);
+      } else {
+        StormSubmitter.submitTopology(name, stormConf, topology);
+      }
+    }
+  }
+
+  public static class FastRandomSentenceSpout extends BaseRichSpout {
+    static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
+          "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
+
+    SpoutOutputCollector _collector;
+    long _periodNano;
+    long _emitAmount;
+    Random _rand;
+    long _nextEmitTime;
+    long _emitsLeft;
+    HistogramMetric _histo;
+
+    public FastRandomSentenceSpout(long ratePerSecond) {
+        if (ratePerSecond > 0) {
+            _periodNano = Math.max(1, 1000000000/ratePerSecond);
+            _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
+        } else {
+            _periodNano = Long.MAX_VALUE - 1;
+            _emitAmount = 1;
+        }
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+      _collector = collector;
+      _rand = ThreadLocalRandom.current();
+      _nextEmitTime = System.nanoTime();
+      _emitsLeft = _emitAmount;
+      _histo = new HistogramMetric(3600000000000L, 3);
+      context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
+    }
+
+    @Override
+    public void nextTuple() {
+      if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
+          _emitsLeft = _emitAmount;
+          _nextEmitTime = _nextEmitTime + _periodNano;
+      }
+
+      if (_emitsLeft > 0) {
+          String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
+          _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
+          _emitsLeft--;
+      }
+    }
+
+    @Override
+    public void ack(Object id) {
+      long end = System.nanoTime();
+      SentWithTime st = (SentWithTime)id;
+      _histo.recordValue(end-st.time);
+    }
+
+    @Override
+    public void fail(Object id) {
+      SentWithTime st = (SentWithTime)id;
+      _collector.emit(new Values(st.sentence), id);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("sentence"));
+    }
+  }
+
+  public static class SplitSentence extends BaseBasicBolt {
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      String sentence = tuple.getString(0);
+      for (String word: sentence.split("\\s+")) {
+          collector.emit(new Values(word, 1));
+      }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word", "count"));
+    }
+  }
+
+  public static class WordCount extends BaseBasicBolt {
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      String word = tuple.getString(0);
+      Integer count = counts.get(word);
+      if (count == null)
+        count = 0;
+      count++;
+      counts.put(word, count);
+      collector.emit(new Values(word, count));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word", "count"));
+    }
+  }
+
+  private static final Histogram _histo = new Histogram(3600000000000L, 3);
+  private static final AtomicLong _systemCPU = new AtomicLong(0);
+  private static final AtomicLong _userCPU = new AtomicLong(0);
+  private static final AtomicLong _gcCount = new AtomicLong(0);
+  private static final AtomicLong _gcMs = new AtomicLong(0);
+
+  private static long _prev_acked = 0;
+  private static long _prev_uptime = 0;
+
+  public static void printMetrics(C client, String name) throws Exception {
+    ClusterSummary summary = client.getClusterInfo();
+    String id = null;
+    for (TopologySummary ts: summary.get_topologies()) {
+      if (name.equals(ts.get_name())) {
+        id = ts.get_id();
+      }
+    }
+    if (id == null) {
+      throw new Exception("Could not find a topology named "+name);
+    }
+    TopologyInfo info = client.getTopologyInfo(id);
+    int uptime = info.get_uptime_secs();
+    long acked = 0;
+    long failed = 0;
+    for (ExecutorSummary exec: info.get_executors()) {
+      if ("spout".equals(exec.get_component_id())) {
+        SpoutStats stats = exec.get_stats().get_specific().get_spout();
+        Map<String, Long> failedMap = stats.get_failed().get(":all-time");
+        Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
+        for (String key: ackedMap.keySet()) {
+          if (failedMap != null) {
+              Long tmp = failedMap.get(key);
+              if (tmp != null) {
+                  failed += tmp;
+              }
+          }
+          long ackVal = ackedMap.get(key);
+          acked += ackVal;
+        }
+      }
+    }
+    long ackedThisTime = acked - _prev_acked;
+    long thisTime = uptime - _prev_uptime;
+    long nnpct, nnnpct, min, max;
+    double mean, stddev;
+    synchronized(_histo) {
+      nnpct = _histo.getValueAtPercentile(99.0);
+      nnnpct = _histo.getValueAtPercentile(99.9);
+      min = _histo.getMinValue();
+      max = _histo.getMaxValue();
+      mean = _histo.getMean();
+      stddev = _histo.getStdDeviation();
+      _histo.reset();
+    }
+    long user = _userCPU.getAndSet(0);
+    long sys = _systemCPU.getAndSet(0);
+    long gc = _gcMs.getAndSet(0);
+    System.out.printf("uptime: %,4d acked: %,8d acked/sec: %,8.2f failed: %,8d " +
+                      "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d\n",
+                       uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
+                       min, max, mean, stddev, user, sys, gc);
+    _prev_uptime = uptime;
+    _prev_acked = acked;
+  }
+
+  public static void kill(C client, String name) throws Exception {
+    KillOptions opts = new KillOptions();
+    opts.set_wait_secs(0);
+    client.killTopologyWithOpts(name, opts);
+  }
+
+  public static void main(String[] args) throws Exception {
+    long ratePerSecond = 500;
+    if (args != null && args.length > 0) {
+        ratePerSecond = Long.valueOf(args[0]);
+    }
+
+    int parallelism = 4;
+    if (args != null && args.length > 1) {
+        parallelism = Integer.valueOf(args[1]);
+    }
+
+    int numMins = 5;
+    if (args != null && args.length > 2) {
+        numMins = Integer.valueOf(args[2]);
+    }
+
+    String name = "wc-test";
+    if (args != null && args.length > 3) {
+        name = args[3];
+    }
+
+    Config conf = new Config();
+    HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
+        @Override
+        public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+            for (DataPoint dp: dataPoints) {
+                if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
+                    synchronized(_histo) {
+                        _histo.add((Histogram)dp.value);
+                    }
+                } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
+                   Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                   Object sys = m.get("sys-ms");
+                   if (sys instanceof Number) {
+                       _systemCPU.getAndAdd(((Number)sys).longValue());
+                   }
+                   Object user = m.get("user-ms");
+                   if (user instanceof Number) {
+                       _userCPU.getAndAdd(((Number)user).longValue());
+                   }
+                } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
+                   Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                   Object count = m.get("count");
+                   if (count instanceof Number) {
+                       _gcCount.getAndAdd(((Number)count).longValue());
+                   }
+                   Object time = m.get("timeMs");
+                   if (time instanceof Number) {
+                       _gcMs.getAndAdd(((Number)time).longValue());
+                   }
+                }
+            }
+        }
+    };
+
+    metricServer.serve();
+    String url = metricServer.getUrl();
+
+    conf.setNumWorkers(parallelism);
+    conf.registerMetricsConsumer(backtype.storm.metric.HttpForwardingMetricsConsumer.class, url, 1);
+    Map<String, String> workerMetrics = new HashMap<String, String>();
+    workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
+    conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
+    conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
+
+    TopologyBuilder builder = new TopologyBuilder();
+
+    int numEach = 4 * parallelism;
+    builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
+
+    builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
+    builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
+
+    C cluster = new C(conf);
+    try {
+        cluster.submitTopology(name, conf, builder.createTopology());
+
+        for (int i = 0; i < numMins * 2; i++) {
+            Thread.sleep(30 * 1000);
+            printMetrics(cluster, name);
+        }
+    } finally {
+        kill(cluster, name);
+    }
+    System.exit(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-metrics/pom.xml b/external/storm-metrics/pom.xml
new file mode 100644
index 0000000..e8b6950
--- /dev/null
+++ b/external/storm-metrics/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+      <artifactId>storm</artifactId>
+      <groupId>org.apache.storm</groupId>
+      <version>0.11.0-SNAPSHOT</version>
+      <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.storm</groupId>
+  <artifactId>storm-metrics</artifactId>
+  <packaging>jar</packaging>
+
+  <name>storm-metrics</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.hdrhistogram</groupId>
+      <artifactId>HdrHistogram</artifactId>
+    </dependency>
+    <dependency>
+      <!-- DO NOT CHANGE THE VERSION OF THIS PACKAGE WITHOUT UPDATING THE NATIVE LIBRARIES in src/main/resources/resources -->
+      <groupId>org.fusesource</groupId>
+      <artifactId>sigar</artifactId>
+      <version>1.6.4</version>
+      <exclusions>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>${project.version}</version>
+      <!-- keep storm out of the jar-with-dependencies -->
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java b/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java
new file mode 100644
index 0000000..4adc500
--- /dev/null
+++ b/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics.hdrhistogram;
+
+import backtype.storm.metric.api.IMetric;
+import org.HdrHistogram.Histogram;
+
+/**
+ * A metric wrapping an HdrHistogram.
+ */
+public class HistogramMetric implements IMetric {
+    private final Histogram _histo;
+
+
+    public HistogramMetric(final int numberOfSignificantValueDigits) {
+        this(null, numberOfSignificantValueDigits);
+    }
+
+    public HistogramMetric(Long highestTrackableValue, final int numberOfSignificantValueDigits) {
+        this(null, highestTrackableValue, numberOfSignificantValueDigits);
+    }
+
+    /**
+     * (From the Constructor of Histogram)
+     * Construct a Histogram given the Lowest and Highest values to be tracked and a number of significant
+     * decimal digits. Providing a lowestDiscernibleValue is useful is situations where the units used
+     * for the histogram's values are much smaller that the minimal accuracy required. E.g. when tracking
+     * time values stated in nanosecond units, where the minimal accuracy required is a microsecond, the
+     * proper value for lowestDiscernibleValue would be 1000.
+     *
+     * @param lowestDiscernibleValue         The lowest value that can be discerned (distinguished from 0) by the
+     *                                       histogram. Must be a positive integer that is {@literal >=} 1. May be
+     *                                       internally rounded down to nearest power of 2
+     *                                       (if null 1 is used).
+     * @param highestTrackableValue          The highest value to be tracked by the histogram. Must be a positive
+     *                                       integer that is {@literal >=} (2 * lowestDiscernibleValue).
+     *                                       (if null 2 * lowestDiscernibleValue is used and auto-resize is enabled)
+     * @param numberOfSignificantValueDigits Specifies the precision to use. This is the number of significant
+     *                                       decimal digits to which the histogram will maintain value resolution
+     *                                       and separation. Must be a non-negative integer between 0 and 5.
+     */
+    public HistogramMetric(Long lowestDiscernibleValue, Long highestTrackableValue,
+                     final int numberOfSignificantValueDigits) {
+        boolean autoResize = false;
+        if (lowestDiscernibleValue == null) lowestDiscernibleValue = 1L;
+        if (highestTrackableValue == null) {
+            highestTrackableValue = 2 * lowestDiscernibleValue;
+            autoResize = true;
+        }
+        _histo = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
+        if (autoResize) _histo.setAutoResize(true);
+    }
+
+    public void recordValue(long val) {
+        _histo.recordValue(val);
+    }
+
+    @Override
+    public Object getValueAndReset() {
+          Histogram copy = _histo.copy();
+          _histo.reset();
+          return copy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java b/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java
new file mode 100644
index 0000000..a3addc9
--- /dev/null
+++ b/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics.sigar;
+
+import org.hyperic.sigar.Sigar;
+import org.hyperic.sigar.ProcCpu;
+
+import backtype.storm.metric.api.IMetric;
+
+import java.util.HashMap;
+
+/**
+ * A metric using Sigar to get User and System CPU utilization for a worker.
+ */
+public class CPUMetric implements IMetric {
+    private long _prevUser = 0;
+    private long _prevSys = 0;
+    private final Sigar _sigar;
+    private final long _pid;
+
+    public CPUMetric() {
+        _sigar = new Sigar();
+        _pid = _sigar.getPid();
+    }
+
+    @Override
+    public Object getValueAndReset() {
+        try {
+          ProcCpu cpu = _sigar.getProcCpu(_pid);
+          long userTotal = cpu.getUser();
+          long sysTotal = cpu.getSys();
+          long user = userTotal - _prevUser;
+          long sys = sysTotal - _prevSys;
+          _prevUser = userTotal;
+          _prevSys = sysTotal;
+
+          HashMap<String, Long> ret = new HashMap<String, Long>();
+          ret.put("user-ms", user);
+          ret.put("sys-ms", sys);
+          return ret;
+      } catch (Exception e) {
+          throw new RuntimeException(e);
+      }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-amd64-freebsd-6.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-amd64-freebsd-6.so b/external/storm-metrics/src/main/resources/resources/libsigar-amd64-freebsd-6.so
new file mode 100644
index 0000000..3e94f0d
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-amd64-freebsd-6.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-amd64-linux.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-amd64-linux.so b/external/storm-metrics/src/main/resources/resources/libsigar-amd64-linux.so
new file mode 100644
index 0000000..5a2e4c2
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-amd64-linux.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-amd64-solaris.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-amd64-solaris.so b/external/storm-metrics/src/main/resources/resources/libsigar-amd64-solaris.so
new file mode 100644
index 0000000..6396482
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-amd64-solaris.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-ia64-hpux-11.sl
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-ia64-hpux-11.sl b/external/storm-metrics/src/main/resources/resources/libsigar-ia64-hpux-11.sl
new file mode 100755
index 0000000..d92ea4a
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-ia64-hpux-11.sl differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-ia64-linux.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-ia64-linux.so b/external/storm-metrics/src/main/resources/resources/libsigar-ia64-linux.so
new file mode 100644
index 0000000..2bd2fc8
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-ia64-linux.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-pa-hpux-11.sl
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-pa-hpux-11.sl b/external/storm-metrics/src/main/resources/resources/libsigar-pa-hpux-11.sl
new file mode 100755
index 0000000..0dfd8a1
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-pa-hpux-11.sl differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-ppc-aix-5.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-ppc-aix-5.so b/external/storm-metrics/src/main/resources/resources/libsigar-ppc-aix-5.so
new file mode 100644
index 0000000..7d4b519
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-ppc-aix-5.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-ppc-linux.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-ppc-linux.so b/external/storm-metrics/src/main/resources/resources/libsigar-ppc-linux.so
new file mode 100644
index 0000000..4394b1b
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-ppc-linux.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-aix-5.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-aix-5.so b/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-aix-5.so
new file mode 100644
index 0000000..35fd828
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-aix-5.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-linux.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-linux.so b/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-linux.so
new file mode 100644
index 0000000..a1ba252
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-linux.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-s390x-linux.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-s390x-linux.so b/external/storm-metrics/src/main/resources/resources/libsigar-s390x-linux.so
new file mode 100644
index 0000000..c275f4a
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-s390x-linux.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-sparc-solaris.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-sparc-solaris.so b/external/storm-metrics/src/main/resources/resources/libsigar-sparc-solaris.so
new file mode 100644
index 0000000..aa847d2
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-sparc-solaris.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-sparc64-solaris.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-sparc64-solaris.so b/external/storm-metrics/src/main/resources/resources/libsigar-sparc64-solaris.so
new file mode 100644
index 0000000..6c4fe80
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-sparc64-solaris.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-universal-macosx.dylib
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-universal-macosx.dylib b/external/storm-metrics/src/main/resources/resources/libsigar-universal-macosx.dylib
new file mode 100644
index 0000000..27ab107
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-universal-macosx.dylib differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-universal64-macosx.dylib
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-universal64-macosx.dylib b/external/storm-metrics/src/main/resources/resources/libsigar-universal64-macosx.dylib
new file mode 100644
index 0000000..0c721fe
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-universal64-macosx.dylib differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-5.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-5.so b/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-5.so
new file mode 100644
index 0000000..8c50c61
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-5.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-6.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-6.so b/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-6.so
new file mode 100644
index 0000000..f080027
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-6.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-x86-linux.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-x86-linux.so b/external/storm-metrics/src/main/resources/resources/libsigar-x86-linux.so
new file mode 100644
index 0000000..a0b64ed
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-x86-linux.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-x86-solaris.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-x86-solaris.so b/external/storm-metrics/src/main/resources/resources/libsigar-x86-solaris.so
new file mode 100644
index 0000000..c6452e5
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-x86-solaris.so differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/sigar-amd64-winnt.dll
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/sigar-amd64-winnt.dll b/external/storm-metrics/src/main/resources/resources/sigar-amd64-winnt.dll
new file mode 100644
index 0000000..1ec8a03
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/sigar-amd64-winnt.dll differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.dll
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.dll b/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.dll
new file mode 100644
index 0000000..6afdc01
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.dll differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.lib
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.lib b/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.lib
new file mode 100644
index 0000000..04924a1
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.lib differ

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7f525bb..9900623 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,6 +213,7 @@
         <jackson.version>2.3.1</jackson.version>
         <thrift.version>0.9.2</thrift.version>
         <junit.version>4.11</junit.version>
+        <hdrhistogram.version>2.1.7</hdrhistogram.version>
     </properties>
 
     <modules>
@@ -232,6 +233,7 @@
         <module>external/flux</module>
         <module>external/storm-elasticsearch</module>
         <module>external/storm-solr</module>
+        <module>external/storm-metrics</module>
         <module>examples/storm-starter</module>
     </modules>
 
@@ -329,6 +331,11 @@
     <dependencyManagement>
         <dependencies>
             <dependency>
+                <groupId>org.hdrhistogram</groupId>
+                <artifactId>HdrHistogram</artifactId>
+                <version>${hdrhistogram.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.clojure</groupId>
                 <artifactId>clojure</artifactId>
                 <version>${clojure.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index eee7bac..54f787e 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -1535,7 +1535,7 @@
             (when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)] (.set_sched_status topo-info sched-status))
             (when-let [component->debug (:component->debug base)]
               (.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug)))
-            (.set_replication_count topo-info (.getReplicationCount (:code-distributor nimbus) storm-id))
+            (.set_replication_count topo-info (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 1))
             topo-info
           ))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
new file mode 100644
index 0000000..77fe49c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.HttpURLConnection;
+
+import com.esotericsoftware.kryo.io.Output;
+import backtype.storm.serialization.KryoValuesSerializer;
+
+import backtype.storm.metric.api.IMetricsConsumer;
+import backtype.storm.task.IErrorReporter;
+import backtype.storm.task.TopologyContext;
+
+/**
+ * Listens for all metrics and POSTs them serialized to a configured URL
+ *
+ * To use, add this to your topology's configuration:
+ *   conf.registerMetricsConsumer(backtype.storm.metrics.HttpForwardingMetricsConsumer.class, "http://example.com:8080/metrics/my-topology/", 1);
+ *
+ * The body of the post is data serialized using backtype.storm.serialization.KryoValuesSerializer, with the data passed in
+ * as a list of [TaskInfo, Collection<DataPoint>].  More things may be appended to the end of the list in the future.
+ *
+ * The values can be deserialized using the backtype.storm.serialization.KryoValuesDeserializer, and a 
+ * correct config + classpath. 
+ */
+public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
+    private transient URL _url; 
+    private transient IErrorReporter _errorReporter;
+    private transient KryoValuesSerializer _serializer;
+
+    @Override
+    public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { 
+        try {
+            _url = new URL((String)registrationArgument);
+            _errorReporter = errorReporter;
+            _serializer = new KryoValuesSerializer(stormConf);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+        try {
+            HttpURLConnection con = (HttpURLConnection)_url.openConnection();
+            con.setRequestMethod("POST");
+            con.setDoOutput(true);
+            Output out = new Output(con.getOutputStream());
+            _serializer.serializeInto(Arrays.asList(taskInfo, dataPoints), out);
+            out.flush();
+            out.close();
+            //The connection is not sent unless a response is requested
+            int response = con.getResponseCode();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void cleanup() { }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
new file mode 100644
index 0000000..6441a39
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.List;
+import java.net.ServerSocket;
+import java.net.InetAddress;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.ServletException;
+
+import backtype.storm.metric.api.IMetricsConsumer.TaskInfo;
+import backtype.storm.metric.api.IMetricsConsumer.DataPoint;
+
+import com.esotericsoftware.kryo.io.Input;
+import backtype.storm.serialization.KryoValuesDeserializer;
+import backtype.storm.utils.Utils;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+/**
+ * A server that can listen for metrics from the HttpForwardingMetricsConsumer.
+ */
+public abstract class HttpForwardingMetricsServer {
+    private Map _conf;
+    private Server _server = null;
+    private int _port = -1;
+    private String _url = null;
+
+    ThreadLocal<KryoValuesDeserializer> _des = new ThreadLocal<KryoValuesDeserializer>() {
+        @Override
+        protected KryoValuesDeserializer initialValue() {
+            return new KryoValuesDeserializer(_conf);
+        }
+    };
+
+    private class MetricsCollectionServlet extends HttpServlet
+    {
+        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+        {
+            Input in = new Input(request.getInputStream());
+            List<Object> metrics = _des.get().deserializeFrom(in);
+            handle((TaskInfo)metrics.get(0), (Collection<DataPoint>)metrics.get(1));
+            response.setStatus(HttpServletResponse.SC_OK);
+        }
+    }
+
+    public HttpForwardingMetricsServer(Map conf) {
+        _conf = Utils.readStormConfig();
+        if (conf != null) {
+            _conf.putAll(conf);
+        }
+    }
+
+    //This needs to be thread safe
+    public abstract void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
+
+    public void serve(Integer port) {
+        try {
+            if (_server != null) throw new RuntimeException("The server is already running");
+    
+            if (port == null || port <= 0) {
+                ServerSocket s = new ServerSocket(0);
+                port = s.getLocalPort();
+                s.close();
+            }
+            _server = new Server(port);
+            _port = port;
+            _url = "http://"+InetAddress.getLocalHost().getHostName()+":"+_port+"/";
+ 
+            ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+            context.setContextPath("/");
+            _server.setHandler(context);
+ 
+            context.addServlet(new ServletHolder(new MetricsCollectionServlet()),"/*");
+
+            _server.start();
+         } catch (RuntimeException e) {
+             throw e;
+         } catch (Exception e) {
+             throw new RuntimeException(e);
+         }
+    }
+
+    public void serve() {
+        serve(null);
+    }
+
+    public int getPort() {
+        return _port;
+    }
+
+    public String getUrl() {
+        return _url;
+    }
+}


[2/5] storm git commit: Added in support for memory to test.

Posted by bo...@apache.org.
Added in support for memory to test.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4c2cb4f1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4c2cb4f1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4c2cb4f1

Branch: refs/heads/master
Commit: 4c2cb4f165bf0c5e6167e7bd2ad4e157ed5be7eb
Parents: c356c35
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Oct 19 08:10:42 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Oct 19 08:10:42 2015 -0500

----------------------------------------------------------------------
 .../jvm/storm/starter/ThroughputVsLatency.java  | 49 ++++++++++++++++++--
 1 file changed, 46 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4c2cb4f1/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
index 0ef1256..a9070ae 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
@@ -45,6 +45,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -216,11 +217,38 @@ public class ThroughputVsLatency {
     }
   }
 
+  private static class MemMeasure {
+    private long _mem = 0;
+    private long _time = 0;
+
+    public synchronized void update(long mem) {
+        _mem = mem;
+        _time = System.currentTimeMillis();
+    }
+
+    public synchronized long get() {
+        return isExpired() ? 0l : _mem;
+    }
+
+    public synchronized boolean isExpired() {
+        return (System.currentTimeMillis() - _time) >= 20000;
+    }
+  }
+
   private static final Histogram _histo = new Histogram(3600000000000L, 3);
   private static final AtomicLong _systemCPU = new AtomicLong(0);
   private static final AtomicLong _userCPU = new AtomicLong(0);
   private static final AtomicLong _gcCount = new AtomicLong(0);
   private static final AtomicLong _gcMs = new AtomicLong(0);
+  private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
+
+  private static long readMemory() {
+    long total = 0;
+    for (MemMeasure mem: _memoryBytes.values()) {
+      total += mem.get();
+    }
+    return total;
+  }
 
   private static long _prev_acked = 0;
   private static long _prev_uptime = 0;
@@ -273,10 +301,12 @@ public class ThroughputVsLatency {
     long user = _userCPU.getAndSet(0);
     long sys = _systemCPU.getAndSet(0);
     long gc = _gcMs.getAndSet(0);
-    System.out.printf("uptime: %,4d acked: %,8d acked/sec: %,8.2f failed: %,8d " +
-                      "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d\n",
+    double memMB = readMemory() / (1024.0 * 1024.0);
+    System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
+                      "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
+                      "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
                        uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
-                       min, max, mean, stddev, user, sys, gc);
+                       min, max, mean, stddev, user, sys, gc, memMB);
     _prev_uptime = uptime;
     _prev_acked = acked;
   }
@@ -312,6 +342,7 @@ public class ThroughputVsLatency {
     HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
         @Override
         public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+            String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
             for (DataPoint dp: dataPoints) {
                 if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
                     synchronized(_histo) {
@@ -337,6 +368,18 @@ public class ThroughputVsLatency {
                    if (time instanceof Number) {
                        _gcMs.getAndAdd(((Number)time).longValue());
                    }
+                } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
+                   Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                   Object val = m.get("usedBytes");
+                   if (val instanceof Number) {
+                       MemMeasure mm = _memoryBytes.get(worker);
+                       if (mm == null) {
+                         mm = new MemMeasure();
+                         MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
+                         mm = tmp == null ? mm : tmp; 
+                       }
+                       mm.update(((Number)val).longValue());
+                   }
                 }
             }
         }


[3/5] storm git commit: Adjusted memory parameters to be closer to what Yahoo uses in production. Fixed small bug

Posted by bo...@apache.org.
Adjusted memory parameters to be closer to what Yahoo uses in production.  Fixed small bug


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b60bb72c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b60bb72c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b60bb72c

Branch: refs/heads/master
Commit: b60bb72c9f572f5cdd99d8deeba1ab214d53e18d
Parents: 4c2cb4f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Oct 19 08:32:56 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Oct 19 08:32:56 2015 -0500

----------------------------------------------------------------------
 .../src/jvm/storm/starter/ThroughputVsLatency.java    | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b60bb72c/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
index a9070ae..2608755 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
@@ -115,6 +115,10 @@ public class ThroughputVsLatency {
         StormSubmitter.submitTopology(name, stormConf, topology);
       }
     }
+
+    public boolean isLocal() {
+      return _local != null;
+    }
   }
 
   public static class FastRandomSentenceSpout extends BaseRichSpout {
@@ -388,12 +392,19 @@ public class ThroughputVsLatency {
     metricServer.serve();
     String url = metricServer.getUrl();
 
+    C cluster = new C(conf);
     conf.setNumWorkers(parallelism);
     conf.registerMetricsConsumer(backtype.storm.metric.HttpForwardingMetricsConsumer.class, url, 1);
     Map<String, String> workerMetrics = new HashMap<String, String>();
-    workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
+    if (!cluster.isLocal()) {
+      //sigar uses JNI and does not work in local mode
+      workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
+    }
     conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
     conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
+    conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
+      "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
+    conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
 
     TopologyBuilder builder = new TopologyBuilder();
 
@@ -403,7 +414,6 @@ public class ThroughputVsLatency {
     builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
     builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
 
-    C cluster = new C(conf);
     try {
         cluster.submitTopology(name, conf, builder.createTopology());
 


[4/5] storm git commit: Merge branch 'STORM-1118-perf' of https://github.com/revans2/incubator-storm into STORM-1118

Posted by bo...@apache.org.
Merge branch 'STORM-1118-perf' of https://github.com/revans2/incubator-storm into STORM-1118

STORM-1118: Added test to compare latency vs. throughput in storm.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d6d9b788
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d6d9b788
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d6d9b788

Branch: refs/heads/master
Commit: d6d9b788e88fd440df1f3b4a0ecbacdb42ce4fd1
Parents: 647d672 b60bb72
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Oct 20 09:38:21 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Oct 20 09:38:21 2015 -0500

----------------------------------------------------------------------
 LICENSE                                         |   9 +
 examples/storm-starter/pom.xml                  |   9 +
 .../jvm/storm/starter/ThroughputVsLatency.java  | 429 +++++++++++++++++++
 external/storm-metrics/pom.xml                  |  58 +++
 .../metrics/hdrhistogram/HistogramMetric.java   |  79 ++++
 .../apache/storm/metrics/sigar/CPUMetric.java   |  60 +++
 .../resources/libsigar-amd64-freebsd-6.so       | Bin 0 -> 210641 bytes
 .../resources/resources/libsigar-amd64-linux.so | Bin 0 -> 246605 bytes
 .../resources/libsigar-amd64-solaris.so         | Bin 0 -> 251360 bytes
 .../resources/libsigar-ia64-hpux-11.sl          | Bin 0 -> 577452 bytes
 .../resources/resources/libsigar-ia64-linux.so  | Bin 0 -> 494929 bytes
 .../resources/resources/libsigar-pa-hpux-11.sl  | Bin 0 -> 516096 bytes
 .../resources/resources/libsigar-ppc-aix-5.so   | Bin 0 -> 400925 bytes
 .../resources/resources/libsigar-ppc-linux.so   | Bin 0 -> 258547 bytes
 .../resources/resources/libsigar-ppc64-aix-5.so | Bin 0 -> 425077 bytes
 .../resources/resources/libsigar-ppc64-linux.so | Bin 0 -> 330767 bytes
 .../resources/resources/libsigar-s390x-linux.so | Bin 0 -> 269932 bytes
 .../resources/libsigar-sparc-solaris.so         | Bin 0 -> 285004 bytes
 .../resources/libsigar-sparc64-solaris.so       | Bin 0 -> 261896 bytes
 .../resources/libsigar-universal-macosx.dylib   | Bin 0 -> 377668 bytes
 .../resources/libsigar-universal64-macosx.dylib | Bin 0 -> 397440 bytes
 .../resources/libsigar-x86-freebsd-5.so         | Bin 0 -> 179751 bytes
 .../resources/libsigar-x86-freebsd-6.so         | Bin 0 -> 179379 bytes
 .../resources/resources/libsigar-x86-linux.so   | Bin 0 -> 233385 bytes
 .../resources/resources/libsigar-x86-solaris.so | Bin 0 -> 242880 bytes
 .../resources/resources/sigar-amd64-winnt.dll   | Bin 0 -> 402432 bytes
 .../resources/resources/sigar-x86-winnt.dll     | Bin 0 -> 266240 bytes
 .../resources/resources/sigar-x86-winnt.lib     | Bin 0 -> 99584 bytes
 pom.xml                                         |   7 +
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   2 +-
 .../metric/HttpForwardingMetricsConsumer.java   |  81 ++++
 .../metric/HttpForwardingMetricsServer.java     | 119 +++++
 32 files changed, 852 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d6d9b788/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------


[5/5] storm git commit: Added STORM-1118 to Changelog

Posted by bo...@apache.org.
Added STORM-1118 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4b4bbd08
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4b4bbd08
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4b4bbd08

Branch: refs/heads/master
Commit: 4b4bbd08bbf881566f9e79d3158ad0e486b00c8f
Parents: d6d9b78
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Oct 20 09:51:02 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Oct 20 09:51:02 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4b4bbd08/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 15a554d..0308cf3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1118: Added test to compare latency vs. throughput in storm.
  * STORM-1110: Fix Component Page for system components
  * STORM-1093: Launching Workers with resources specified in resource-aware schedulers
  * STORM-1102: Add a default flush interval for HiveBolt