You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/20 16:51:48 UTC
[1/5] storm git commit: STORM-1118: Added test to compair latency vs
throughput in storm.
Repository: storm
Updated Branches:
refs/heads/master 647d672b5 -> 4b4bbd08b
STORM-1118: Added test to compair latency vs throughput in storm.
Also added a metrics gathering server for testing, an HdrHistogram based metric, and CPU utilization metric using sigar.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c356c35a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c356c35a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c356c35a
Branch: refs/heads/master
Commit: c356c35aeab17bc14861bf7efa8ddedd3f257741
Parents: 1d393ee
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Oct 16 14:57:23 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Oct 17 13:59:23 2015 -0500
----------------------------------------------------------------------
LICENSE | 9 +
examples/storm-starter/pom.xml | 9 +
.../jvm/storm/starter/ThroughputVsLatency.java | 376 +++++++++++++++++++
external/storm-metrics/pom.xml | 58 +++
.../metrics/hdrhistogram/HistogramMetric.java | 79 ++++
.../apache/storm/metrics/sigar/CPUMetric.java | 60 +++
.../resources/libsigar-amd64-freebsd-6.so | Bin 0 -> 210641 bytes
.../resources/resources/libsigar-amd64-linux.so | Bin 0 -> 246605 bytes
.../resources/libsigar-amd64-solaris.so | Bin 0 -> 251360 bytes
.../resources/libsigar-ia64-hpux-11.sl | Bin 0 -> 577452 bytes
.../resources/resources/libsigar-ia64-linux.so | Bin 0 -> 494929 bytes
.../resources/resources/libsigar-pa-hpux-11.sl | Bin 0 -> 516096 bytes
.../resources/resources/libsigar-ppc-aix-5.so | Bin 0 -> 400925 bytes
.../resources/resources/libsigar-ppc-linux.so | Bin 0 -> 258547 bytes
.../resources/resources/libsigar-ppc64-aix-5.so | Bin 0 -> 425077 bytes
.../resources/resources/libsigar-ppc64-linux.so | Bin 0 -> 330767 bytes
.../resources/resources/libsigar-s390x-linux.so | Bin 0 -> 269932 bytes
.../resources/libsigar-sparc-solaris.so | Bin 0 -> 285004 bytes
.../resources/libsigar-sparc64-solaris.so | Bin 0 -> 261896 bytes
.../resources/libsigar-universal-macosx.dylib | Bin 0 -> 377668 bytes
.../resources/libsigar-universal64-macosx.dylib | Bin 0 -> 397440 bytes
.../resources/libsigar-x86-freebsd-5.so | Bin 0 -> 179751 bytes
.../resources/libsigar-x86-freebsd-6.so | Bin 0 -> 179379 bytes
.../resources/resources/libsigar-x86-linux.so | Bin 0 -> 233385 bytes
.../resources/resources/libsigar-x86-solaris.so | Bin 0 -> 242880 bytes
.../resources/resources/sigar-amd64-winnt.dll | Bin 0 -> 402432 bytes
.../resources/resources/sigar-x86-winnt.dll | Bin 0 -> 266240 bytes
.../resources/resources/sigar-x86-winnt.lib | Bin 0 -> 99584 bytes
pom.xml | 7 +
.../src/clj/backtype/storm/daemon/nimbus.clj | 2 +-
.../metric/HttpForwardingMetricsConsumer.java | 81 ++++
.../metric/HttpForwardingMetricsServer.java | 119 ++++++
32 files changed, 799 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 8755d1b..5eedc94 100644
--- a/LICENSE
+++ b/LICENSE
@@ -202,6 +202,15 @@
limitations under the License.
-----------------------------------------------------------------------
+For Hyperic Sigar (external/storm-metrics/src/main/resources/resources)
+
+https://github.com/hyperic/sigar
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+-----------------------------------------------------------------------
For jQuery 1.11.1 (storm-core/src/ui/public/js/jquery-1.11.1.min.js)
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 0f82c33..ac6472e 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -33,6 +33,10 @@
<dependencies>
<dependency>
+ <groupId>org.hdrhistogram</groupId>
+ <artifactId>HdrHistogram</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
@@ -105,6 +109,11 @@
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
+ <artifactId>storm-metrics</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
new file mode 100644
index 0000000..0ef1256
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.metric.HttpForwardingMetricsServer;
+import backtype.storm.metric.HttpForwardingMetricsConsumer;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IMetricsConsumer.TaskInfo;
+import backtype.storm.metric.api.IMetricsConsumer.DataPoint;
+import backtype.storm.generated.*;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import backtype.storm.StormSubmitter;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
+import org.HdrHistogram.Histogram;
+
+/**
+ * WordCount but the spout goes at a predefined rate and we collect
+ * proper latency statistics.
+ */
+public class ThroughputVsLatency {
+ private static class SentWithTime {
+ public final String sentence;
+ public final long time;
+
+ SentWithTime(String sentence, long time) {
+ this.sentence = sentence;
+ this.time = time;
+ }
+ }
+
+ public static class C {
+ LocalCluster _local = null;
+ Nimbus.Client _client = null;
+
+ public C(Map conf) {
+ Map clusterConf = Utils.readStormConfig();
+ if (conf != null) {
+ clusterConf.putAll(conf);
+ }
+ Boolean isLocal = (Boolean)clusterConf.get("run.local");
+ if (isLocal != null && isLocal) {
+ _local = new LocalCluster();
+ } else {
+ _client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+ }
+ }
+
+ public ClusterSummary getClusterInfo() throws Exception {
+ if (_local != null) {
+ return _local.getClusterInfo();
+ } else {
+ return _client.getClusterInfo();
+ }
+ }
+
+ public TopologyInfo getTopologyInfo(String id) throws Exception {
+ if (_local != null) {
+ return _local.getTopologyInfo(id);
+ } else {
+ return _client.getTopologyInfo(id);
+ }
+ }
+
+ public void killTopologyWithOpts(String name, KillOptions opts) throws Exception {
+ if (_local != null) {
+ _local.killTopologyWithOpts(name, opts);
+ } else {
+ _client.killTopologyWithOpts(name, opts);
+ }
+ }
+
+ public void submitTopology(String name, Map stormConf, StormTopology topology) throws Exception {
+ if (_local != null) {
+ _local.submitTopology(name, stormConf, topology);
+ } else {
+ StormSubmitter.submitTopology(name, stormConf, topology);
+ }
+ }
+ }
+
+ public static class FastRandomSentenceSpout extends BaseRichSpout {
+ static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
+ "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
+
+ SpoutOutputCollector _collector;
+ long _periodNano;
+ long _emitAmount;
+ Random _rand;
+ long _nextEmitTime;
+ long _emitsLeft;
+ HistogramMetric _histo;
+
+ public FastRandomSentenceSpout(long ratePerSecond) {
+ if (ratePerSecond > 0) {
+ _periodNano = Math.max(1, 1000000000/ratePerSecond);
+ _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
+ } else {
+ _periodNano = Long.MAX_VALUE - 1;
+ _emitAmount = 1;
+ }
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ _collector = collector;
+ _rand = ThreadLocalRandom.current();
+ _nextEmitTime = System.nanoTime();
+ _emitsLeft = _emitAmount;
+ _histo = new HistogramMetric(3600000000000L, 3);
+ context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
+ }
+
+ @Override
+ public void nextTuple() {
+ if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
+ _emitsLeft = _emitAmount;
+ _nextEmitTime = _nextEmitTime + _periodNano;
+ }
+
+ if (_emitsLeft > 0) {
+ String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
+ _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
+ _emitsLeft--;
+ }
+ }
+
+ @Override
+ public void ack(Object id) {
+ long end = System.nanoTime();
+ SentWithTime st = (SentWithTime)id;
+ _histo.recordValue(end-st.time);
+ }
+
+ @Override
+ public void fail(Object id) {
+ SentWithTime st = (SentWithTime)id;
+ _collector.emit(new Values(st.sentence), id);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("sentence"));
+ }
+ }
+
+ public static class SplitSentence extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String sentence = tuple.getString(0);
+ for (String word: sentence.split("\\s+")) {
+ collector.emit(new Values(word, 1));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+ }
+
+ public static class WordCount extends BaseBasicBolt {
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String word = tuple.getString(0);
+ Integer count = counts.get(word);
+ if (count == null)
+ count = 0;
+ count++;
+ counts.put(word, count);
+ collector.emit(new Values(word, count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+ }
+
+ private static final Histogram _histo = new Histogram(3600000000000L, 3);
+ private static final AtomicLong _systemCPU = new AtomicLong(0);
+ private static final AtomicLong _userCPU = new AtomicLong(0);
+ private static final AtomicLong _gcCount = new AtomicLong(0);
+ private static final AtomicLong _gcMs = new AtomicLong(0);
+
+ private static long _prev_acked = 0;
+ private static long _prev_uptime = 0;
+
+ public static void printMetrics(C client, String name) throws Exception {
+ ClusterSummary summary = client.getClusterInfo();
+ String id = null;
+ for (TopologySummary ts: summary.get_topologies()) {
+ if (name.equals(ts.get_name())) {
+ id = ts.get_id();
+ }
+ }
+ if (id == null) {
+ throw new Exception("Could not find a topology named "+name);
+ }
+ TopologyInfo info = client.getTopologyInfo(id);
+ int uptime = info.get_uptime_secs();
+ long acked = 0;
+ long failed = 0;
+ for (ExecutorSummary exec: info.get_executors()) {
+ if ("spout".equals(exec.get_component_id())) {
+ SpoutStats stats = exec.get_stats().get_specific().get_spout();
+ Map<String, Long> failedMap = stats.get_failed().get(":all-time");
+ Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
+ for (String key: ackedMap.keySet()) {
+ if (failedMap != null) {
+ Long tmp = failedMap.get(key);
+ if (tmp != null) {
+ failed += tmp;
+ }
+ }
+ long ackVal = ackedMap.get(key);
+ acked += ackVal;
+ }
+ }
+ }
+ long ackedThisTime = acked - _prev_acked;
+ long thisTime = uptime - _prev_uptime;
+ long nnpct, nnnpct, min, max;
+ double mean, stddev;
+ synchronized(_histo) {
+ nnpct = _histo.getValueAtPercentile(99.0);
+ nnnpct = _histo.getValueAtPercentile(99.9);
+ min = _histo.getMinValue();
+ max = _histo.getMaxValue();
+ mean = _histo.getMean();
+ stddev = _histo.getStdDeviation();
+ _histo.reset();
+ }
+ long user = _userCPU.getAndSet(0);
+ long sys = _systemCPU.getAndSet(0);
+ long gc = _gcMs.getAndSet(0);
+ System.out.printf("uptime: %,4d acked: %,8d acked/sec: %,8.2f failed: %,8d " +
+ "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d\n",
+ uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
+ min, max, mean, stddev, user, sys, gc);
+ _prev_uptime = uptime;
+ _prev_acked = acked;
+ }
+
+ public static void kill(C client, String name) throws Exception {
+ KillOptions opts = new KillOptions();
+ opts.set_wait_secs(0);
+ client.killTopologyWithOpts(name, opts);
+ }
+
+ public static void main(String[] args) throws Exception {
+ long ratePerSecond = 500;
+ if (args != null && args.length > 0) {
+ ratePerSecond = Long.valueOf(args[0]);
+ }
+
+ int parallelism = 4;
+ if (args != null && args.length > 1) {
+ parallelism = Integer.valueOf(args[1]);
+ }
+
+ int numMins = 5;
+ if (args != null && args.length > 2) {
+ numMins = Integer.valueOf(args[2]);
+ }
+
+ String name = "wc-test";
+ if (args != null && args.length > 3) {
+ name = args[3];
+ }
+
+ Config conf = new Config();
+ HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
+ @Override
+ public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+ for (DataPoint dp: dataPoints) {
+ if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
+ synchronized(_histo) {
+ _histo.add((Histogram)dp.value);
+ }
+ } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
+ Map<Object, Object> m = (Map<Object, Object>)dp.value;
+ Object sys = m.get("sys-ms");
+ if (sys instanceof Number) {
+ _systemCPU.getAndAdd(((Number)sys).longValue());
+ }
+ Object user = m.get("user-ms");
+ if (user instanceof Number) {
+ _userCPU.getAndAdd(((Number)user).longValue());
+ }
+ } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
+ Map<Object, Object> m = (Map<Object, Object>)dp.value;
+ Object count = m.get("count");
+ if (count instanceof Number) {
+ _gcCount.getAndAdd(((Number)count).longValue());
+ }
+ Object time = m.get("timeMs");
+ if (time instanceof Number) {
+ _gcMs.getAndAdd(((Number)time).longValue());
+ }
+ }
+ }
+ }
+ };
+
+ metricServer.serve();
+ String url = metricServer.getUrl();
+
+ conf.setNumWorkers(parallelism);
+ conf.registerMetricsConsumer(backtype.storm.metric.HttpForwardingMetricsConsumer.class, url, 1);
+ Map<String, String> workerMetrics = new HashMap<String, String>();
+ workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
+ conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
+ conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ int numEach = 4 * parallelism;
+ builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
+
+ builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
+ builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
+
+ C cluster = new C(conf);
+ try {
+ cluster.submitTopology(name, conf, builder.createTopology());
+
+ for (int i = 0; i < numMins * 2; i++) {
+ Thread.sleep(30 * 1000);
+ printMetrics(cluster, name);
+ }
+ } finally {
+ kill(cluster, name);
+ }
+ System.exit(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-metrics/pom.xml b/external/storm-metrics/pom.xml
new file mode 100644
index 0000000..e8b6950
--- /dev/null
+++ b/external/storm-metrics/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-metrics</artifactId>
+ <packaging>jar</packaging>
+
+ <name>storm-metrics</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.hdrhistogram</groupId>
+ <artifactId>HdrHistogram</artifactId>
+ </dependency>
+ <dependency>
+ <!-- DO NOT CHANGE THE VERSION OF THIS PACKAGE WITHOUT UPDATING THE NATIVE LIBRARIES in src/main/resources/resources -->
+ <groupId>org.fusesource</groupId>
+ <artifactId>sigar</artifactId>
+ <version>1.6.4</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <!-- keep storm out of the jar-with-dependencies -->
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java b/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java
new file mode 100644
index 0000000..4adc500
--- /dev/null
+++ b/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics.hdrhistogram;
+
+import backtype.storm.metric.api.IMetric;
+import org.HdrHistogram.Histogram;
+
+/**
+ * A metric wrapping an HdrHistogram.
+ */
+public class HistogramMetric implements IMetric {
+ private final Histogram _histo;
+
+
+ public HistogramMetric(final int numberOfSignificantValueDigits) {
+ this(null, numberOfSignificantValueDigits);
+ }
+
+ public HistogramMetric(Long highestTrackableValue, final int numberOfSignificantValueDigits) {
+ this(null, highestTrackableValue, numberOfSignificantValueDigits);
+ }
+
+ /**
+ * (From the Constructor of Histogram)
+ * Construct a Histogram given the Lowest and Highest values to be tracked and a number of significant
+ * decimal digits. Providing a lowestDiscernibleValue is useful is situations where the units used
+ * for the histogram's values are much smaller that the minimal accuracy required. E.g. when tracking
+ * time values stated in nanosecond units, where the minimal accuracy required is a microsecond, the
+ * proper value for lowestDiscernibleValue would be 1000.
+ *
+ * @param lowestDiscernibleValue The lowest value that can be discerned (distinguished from 0) by the
+ * histogram. Must be a positive integer that is {@literal >=} 1. May be
+ * internally rounded down to nearest power of 2
+ * (if null 1 is used).
+ * @param highestTrackableValue The highest value to be tracked by the histogram. Must be a positive
+ * integer that is {@literal >=} (2 * lowestDiscernibleValue).
+ * (if null 2 * lowestDiscernibleValue is used and auto-resize is enabled)
+ * @param numberOfSignificantValueDigits Specifies the precision to use. This is the number of significant
+ * decimal digits to which the histogram will maintain value resolution
+ * and separation. Must be a non-negative integer between 0 and 5.
+ */
+ public HistogramMetric(Long lowestDiscernibleValue, Long highestTrackableValue,
+ final int numberOfSignificantValueDigits) {
+ boolean autoResize = false;
+ if (lowestDiscernibleValue == null) lowestDiscernibleValue = 1L;
+ if (highestTrackableValue == null) {
+ highestTrackableValue = 2 * lowestDiscernibleValue;
+ autoResize = true;
+ }
+ _histo = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
+ if (autoResize) _histo.setAutoResize(true);
+ }
+
+ public void recordValue(long val) {
+ _histo.recordValue(val);
+ }
+
+ @Override
+ public Object getValueAndReset() {
+ Histogram copy = _histo.copy();
+ _histo.reset();
+ return copy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java b/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java
new file mode 100644
index 0000000..a3addc9
--- /dev/null
+++ b/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metrics.sigar;
+
+import org.hyperic.sigar.Sigar;
+import org.hyperic.sigar.ProcCpu;
+
+import backtype.storm.metric.api.IMetric;
+
+import java.util.HashMap;
+
+/**
+ * A metric using Sigar to get User and System CPU utilization for a worker.
+ */
+public class CPUMetric implements IMetric {
+ private long _prevUser = 0;
+ private long _prevSys = 0;
+ private final Sigar _sigar;
+ private final long _pid;
+
+ public CPUMetric() {
+ _sigar = new Sigar();
+ _pid = _sigar.getPid();
+ }
+
+ @Override
+ public Object getValueAndReset() {
+ try {
+ ProcCpu cpu = _sigar.getProcCpu(_pid);
+ long userTotal = cpu.getUser();
+ long sysTotal = cpu.getSys();
+ long user = userTotal - _prevUser;
+ long sys = sysTotal - _prevSys;
+ _prevUser = userTotal;
+ _prevSys = sysTotal;
+
+ HashMap<String, Long> ret = new HashMap<String, Long>();
+ ret.put("user-ms", user);
+ ret.put("sys-ms", sys);
+ return ret;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-amd64-freebsd-6.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-amd64-freebsd-6.so b/external/storm-metrics/src/main/resources/resources/libsigar-amd64-freebsd-6.so
new file mode 100644
index 0000000..3e94f0d
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-amd64-freebsd-6.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-amd64-linux.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-amd64-linux.so b/external/storm-metrics/src/main/resources/resources/libsigar-amd64-linux.so
new file mode 100644
index 0000000..5a2e4c2
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-amd64-linux.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-amd64-solaris.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-amd64-solaris.so b/external/storm-metrics/src/main/resources/resources/libsigar-amd64-solaris.so
new file mode 100644
index 0000000..6396482
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-amd64-solaris.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-ia64-hpux-11.sl
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-ia64-hpux-11.sl b/external/storm-metrics/src/main/resources/resources/libsigar-ia64-hpux-11.sl
new file mode 100755
index 0000000..d92ea4a
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-ia64-hpux-11.sl differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-ia64-linux.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-ia64-linux.so b/external/storm-metrics/src/main/resources/resources/libsigar-ia64-linux.so
new file mode 100644
index 0000000..2bd2fc8
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-ia64-linux.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-pa-hpux-11.sl
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-pa-hpux-11.sl b/external/storm-metrics/src/main/resources/resources/libsigar-pa-hpux-11.sl
new file mode 100755
index 0000000..0dfd8a1
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-pa-hpux-11.sl differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-ppc-aix-5.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-ppc-aix-5.so b/external/storm-metrics/src/main/resources/resources/libsigar-ppc-aix-5.so
new file mode 100644
index 0000000..7d4b519
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-ppc-aix-5.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-ppc-linux.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-ppc-linux.so b/external/storm-metrics/src/main/resources/resources/libsigar-ppc-linux.so
new file mode 100644
index 0000000..4394b1b
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-ppc-linux.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-aix-5.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-aix-5.so b/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-aix-5.so
new file mode 100644
index 0000000..35fd828
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-aix-5.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-linux.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-linux.so b/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-linux.so
new file mode 100644
index 0000000..a1ba252
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-ppc64-linux.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-s390x-linux.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-s390x-linux.so b/external/storm-metrics/src/main/resources/resources/libsigar-s390x-linux.so
new file mode 100644
index 0000000..c275f4a
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-s390x-linux.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-sparc-solaris.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-sparc-solaris.so b/external/storm-metrics/src/main/resources/resources/libsigar-sparc-solaris.so
new file mode 100644
index 0000000..aa847d2
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-sparc-solaris.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-sparc64-solaris.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-sparc64-solaris.so b/external/storm-metrics/src/main/resources/resources/libsigar-sparc64-solaris.so
new file mode 100644
index 0000000..6c4fe80
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-sparc64-solaris.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-universal-macosx.dylib
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-universal-macosx.dylib b/external/storm-metrics/src/main/resources/resources/libsigar-universal-macosx.dylib
new file mode 100644
index 0000000..27ab107
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-universal-macosx.dylib differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-universal64-macosx.dylib
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-universal64-macosx.dylib b/external/storm-metrics/src/main/resources/resources/libsigar-universal64-macosx.dylib
new file mode 100644
index 0000000..0c721fe
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-universal64-macosx.dylib differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-5.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-5.so b/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-5.so
new file mode 100644
index 0000000..8c50c61
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-5.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-6.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-6.so b/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-6.so
new file mode 100644
index 0000000..f080027
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-x86-freebsd-6.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-x86-linux.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-x86-linux.so b/external/storm-metrics/src/main/resources/resources/libsigar-x86-linux.so
new file mode 100644
index 0000000..a0b64ed
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-x86-linux.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/libsigar-x86-solaris.so
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/libsigar-x86-solaris.so b/external/storm-metrics/src/main/resources/resources/libsigar-x86-solaris.so
new file mode 100644
index 0000000..c6452e5
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/libsigar-x86-solaris.so differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/sigar-amd64-winnt.dll
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/sigar-amd64-winnt.dll b/external/storm-metrics/src/main/resources/resources/sigar-amd64-winnt.dll
new file mode 100644
index 0000000..1ec8a03
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/sigar-amd64-winnt.dll differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.dll
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.dll b/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.dll
new file mode 100644
index 0000000..6afdc01
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.dll differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.lib
----------------------------------------------------------------------
diff --git a/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.lib b/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.lib
new file mode 100644
index 0000000..04924a1
Binary files /dev/null and b/external/storm-metrics/src/main/resources/resources/sigar-x86-winnt.lib differ
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7f525bb..9900623 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,6 +213,7 @@
<jackson.version>2.3.1</jackson.version>
<thrift.version>0.9.2</thrift.version>
<junit.version>4.11</junit.version>
+ <hdrhistogram.version>2.1.7</hdrhistogram.version>
</properties>
<modules>
@@ -232,6 +233,7 @@
<module>external/flux</module>
<module>external/storm-elasticsearch</module>
<module>external/storm-solr</module>
+ <module>external/storm-metrics</module>
<module>examples/storm-starter</module>
</modules>
@@ -329,6 +331,11 @@
<dependencyManagement>
<dependencies>
<dependency>
+ <groupId>org.hdrhistogram</groupId>
+ <artifactId>HdrHistogram</artifactId>
+ <version>${hdrhistogram.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>${clojure.version}</version>
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index eee7bac..54f787e 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -1535,7 +1535,7 @@
(when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)] (.set_sched_status topo-info sched-status))
(when-let [component->debug (:component->debug base)]
(.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug)))
- (.set_replication_count topo-info (.getReplicationCount (:code-distributor nimbus) storm-id))
+ (.set_replication_count topo-info (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 1))
topo-info
))
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
new file mode 100644
index 0000000..77fe49c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.HttpURLConnection;
+
+import com.esotericsoftware.kryo.io.Output;
+import backtype.storm.serialization.KryoValuesSerializer;
+
+import backtype.storm.metric.api.IMetricsConsumer;
+import backtype.storm.task.IErrorReporter;
+import backtype.storm.task.TopologyContext;
+
+/**
+ * Listens for all metrics and POSTs them serialized to a configured URL
+ *
+ * To use, add this to your topology's configuration:
+ * conf.registerMetricsConsumer(backtype.storm.metrics.HttpForwardingMetricsConsumer.class, "http://example.com:8080/metrics/my-topology/", 1);
+ *
+ * The body of the post is data serialized using backtype.storm.serialization.KryoValuesSerializer, with the data passed in
+ * as a list of [TaskInfo, Collection<DataPoint>]. More things may be appended to the end of the list in the future.
+ *
+ * The values can be deserialized using the backtype.storm.serialization.KryoValuesDeserializer, and a
+ * correct config + classpath.
+ */
+public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
+ private transient URL _url;
+ private transient IErrorReporter _errorReporter;
+ private transient KryoValuesSerializer _serializer;
+
+ @Override
+ public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
+ try {
+ _url = new URL((String)registrationArgument);
+ _errorReporter = errorReporter;
+ _serializer = new KryoValuesSerializer(stormConf);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+ try {
+ HttpURLConnection con = (HttpURLConnection)_url.openConnection();
+ con.setRequestMethod("POST");
+ con.setDoOutput(true);
+ Output out = new Output(con.getOutputStream());
+ _serializer.serializeInto(Arrays.asList(taskInfo, dataPoints), out);
+ out.flush();
+ out.close();
+ //The connection is not sent unless a response is requested
+ int response = con.getResponseCode();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanup() { }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c356c35a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
new file mode 100644
index 0000000..6441a39
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.List;
+import java.net.ServerSocket;
+import java.net.InetAddress;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.ServletException;
+
+import backtype.storm.metric.api.IMetricsConsumer.TaskInfo;
+import backtype.storm.metric.api.IMetricsConsumer.DataPoint;
+
+import com.esotericsoftware.kryo.io.Input;
+import backtype.storm.serialization.KryoValuesDeserializer;
+import backtype.storm.utils.Utils;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+/**
+ * A server that can listen for metrics from the HttpForwardingMetricsConsumer.
+ */
+public abstract class HttpForwardingMetricsServer {
+ private Map _conf;
+ private Server _server = null;
+ private int _port = -1;
+ private String _url = null;
+
+ ThreadLocal<KryoValuesDeserializer> _des = new ThreadLocal<KryoValuesDeserializer>() {
+ @Override
+ protected KryoValuesDeserializer initialValue() {
+ return new KryoValuesDeserializer(_conf);
+ }
+ };
+
+ private class MetricsCollectionServlet extends HttpServlet
+ {
+ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+ {
+ Input in = new Input(request.getInputStream());
+ List<Object> metrics = _des.get().deserializeFrom(in);
+ handle((TaskInfo)metrics.get(0), (Collection<DataPoint>)metrics.get(1));
+ response.setStatus(HttpServletResponse.SC_OK);
+ }
+ }
+
+ public HttpForwardingMetricsServer(Map conf) {
+ _conf = Utils.readStormConfig();
+ if (conf != null) {
+ _conf.putAll(conf);
+ }
+ }
+
+ //This needs to be thread safe
+ public abstract void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
+
+ public void serve(Integer port) {
+ try {
+ if (_server != null) throw new RuntimeException("The server is already running");
+
+ if (port == null || port <= 0) {
+ ServerSocket s = new ServerSocket(0);
+ port = s.getLocalPort();
+ s.close();
+ }
+ _server = new Server(port);
+ _port = port;
+ _url = "http://"+InetAddress.getLocalHost().getHostName()+":"+_port+"/";
+
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ context.setContextPath("/");
+ _server.setHandler(context);
+
+ context.addServlet(new ServletHolder(new MetricsCollectionServlet()),"/*");
+
+ _server.start();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void serve() {
+ serve(null);
+ }
+
+ public int getPort() {
+ return _port;
+ }
+
+ public String getUrl() {
+ return _url;
+ }
+}
[2/5] storm git commit: Added in support for memory to test.
Posted by bo...@apache.org.
Added in support for memory to test.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4c2cb4f1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4c2cb4f1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4c2cb4f1
Branch: refs/heads/master
Commit: 4c2cb4f165bf0c5e6167e7bd2ad4e157ed5be7eb
Parents: c356c35
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Oct 19 08:10:42 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Oct 19 08:10:42 2015 -0500
----------------------------------------------------------------------
.../jvm/storm/starter/ThroughputVsLatency.java | 49 ++++++++++++++++++--
1 file changed, 46 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4c2cb4f1/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
index 0ef1256..a9070ae 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
@@ -45,6 +45,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
@@ -216,11 +217,38 @@ public class ThroughputVsLatency {
}
}
+ private static class MemMeasure {
+ private long _mem = 0;
+ private long _time = 0;
+
+ public synchronized void update(long mem) {
+ _mem = mem;
+ _time = System.currentTimeMillis();
+ }
+
+ public synchronized long get() {
+ return isExpired() ? 0l : _mem;
+ }
+
+ public synchronized boolean isExpired() {
+ return (System.currentTimeMillis() - _time) >= 20000;
+ }
+ }
+
private static final Histogram _histo = new Histogram(3600000000000L, 3);
private static final AtomicLong _systemCPU = new AtomicLong(0);
private static final AtomicLong _userCPU = new AtomicLong(0);
private static final AtomicLong _gcCount = new AtomicLong(0);
private static final AtomicLong _gcMs = new AtomicLong(0);
+ private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
+
+ private static long readMemory() {
+ long total = 0;
+ for (MemMeasure mem: _memoryBytes.values()) {
+ total += mem.get();
+ }
+ return total;
+ }
private static long _prev_acked = 0;
private static long _prev_uptime = 0;
@@ -273,10 +301,12 @@ public class ThroughputVsLatency {
long user = _userCPU.getAndSet(0);
long sys = _systemCPU.getAndSet(0);
long gc = _gcMs.getAndSet(0);
- System.out.printf("uptime: %,4d acked: %,8d acked/sec: %,8.2f failed: %,8d " +
- "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d\n",
+ double memMB = readMemory() / (1024.0 * 1024.0);
+ System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
+ "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
+ "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
- min, max, mean, stddev, user, sys, gc);
+ min, max, mean, stddev, user, sys, gc, memMB);
_prev_uptime = uptime;
_prev_acked = acked;
}
@@ -312,6 +342,7 @@ public class ThroughputVsLatency {
HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
@Override
public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+ String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
for (DataPoint dp: dataPoints) {
if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
synchronized(_histo) {
@@ -337,6 +368,18 @@ public class ThroughputVsLatency {
if (time instanceof Number) {
_gcMs.getAndAdd(((Number)time).longValue());
}
+ } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
+ Map<Object, Object> m = (Map<Object, Object>)dp.value;
+ Object val = m.get("usedBytes");
+ if (val instanceof Number) {
+ MemMeasure mm = _memoryBytes.get(worker);
+ if (mm == null) {
+ mm = new MemMeasure();
+ MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
+ mm = tmp == null ? mm : tmp;
+ }
+ mm.update(((Number)val).longValue());
+ }
}
}
}
[3/5] storm git commit: Adjusted memory parameters to be closer to
what Yahoo uses in production. Fixed small bug
Posted by bo...@apache.org.
Adjusted memory parameters to be closer to what Yahoo uses in production. Fixed small bug
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b60bb72c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b60bb72c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b60bb72c
Branch: refs/heads/master
Commit: b60bb72c9f572f5cdd99d8deeba1ab214d53e18d
Parents: 4c2cb4f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Oct 19 08:32:56 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Oct 19 08:32:56 2015 -0500
----------------------------------------------------------------------
.../src/jvm/storm/starter/ThroughputVsLatency.java | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b60bb72c/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
index a9070ae..2608755 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
@@ -115,6 +115,10 @@ public class ThroughputVsLatency {
StormSubmitter.submitTopology(name, stormConf, topology);
}
}
+
+ public boolean isLocal() {
+ return _local != null;
+ }
}
public static class FastRandomSentenceSpout extends BaseRichSpout {
@@ -388,12 +392,19 @@ public class ThroughputVsLatency {
metricServer.serve();
String url = metricServer.getUrl();
+ C cluster = new C(conf);
conf.setNumWorkers(parallelism);
conf.registerMetricsConsumer(backtype.storm.metric.HttpForwardingMetricsConsumer.class, url, 1);
Map<String, String> workerMetrics = new HashMap<String, String>();
- workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
+ if (!cluster.isLocal()) {
+ //sigar uses JNI and does not work in local mode
+ workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
+ }
conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
+ conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
+ "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
+ conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
TopologyBuilder builder = new TopologyBuilder();
@@ -403,7 +414,6 @@ public class ThroughputVsLatency {
builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
- C cluster = new C(conf);
try {
cluster.submitTopology(name, conf, builder.createTopology());
[4/5] storm git commit: Merge branch 'STORM-1118-perf' of
https://github.com/revans2/incubator-storm into STORM-1118
Posted by bo...@apache.org.
Merge branch 'STORM-1118-perf' of https://github.com/revans2/incubator-storm into STORM-1118
STORM-1118: Added test to compare latency vs. throughput in storm.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d6d9b788
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d6d9b788
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d6d9b788
Branch: refs/heads/master
Commit: d6d9b788e88fd440df1f3b4a0ecbacdb42ce4fd1
Parents: 647d672 b60bb72
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Oct 20 09:38:21 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Oct 20 09:38:21 2015 -0500
----------------------------------------------------------------------
LICENSE | 9 +
examples/storm-starter/pom.xml | 9 +
.../jvm/storm/starter/ThroughputVsLatency.java | 429 +++++++++++++++++++
external/storm-metrics/pom.xml | 58 +++
.../metrics/hdrhistogram/HistogramMetric.java | 79 ++++
.../apache/storm/metrics/sigar/CPUMetric.java | 60 +++
.../resources/libsigar-amd64-freebsd-6.so | Bin 0 -> 210641 bytes
.../resources/resources/libsigar-amd64-linux.so | Bin 0 -> 246605 bytes
.../resources/libsigar-amd64-solaris.so | Bin 0 -> 251360 bytes
.../resources/libsigar-ia64-hpux-11.sl | Bin 0 -> 577452 bytes
.../resources/resources/libsigar-ia64-linux.so | Bin 0 -> 494929 bytes
.../resources/resources/libsigar-pa-hpux-11.sl | Bin 0 -> 516096 bytes
.../resources/resources/libsigar-ppc-aix-5.so | Bin 0 -> 400925 bytes
.../resources/resources/libsigar-ppc-linux.so | Bin 0 -> 258547 bytes
.../resources/resources/libsigar-ppc64-aix-5.so | Bin 0 -> 425077 bytes
.../resources/resources/libsigar-ppc64-linux.so | Bin 0 -> 330767 bytes
.../resources/resources/libsigar-s390x-linux.so | Bin 0 -> 269932 bytes
.../resources/libsigar-sparc-solaris.so | Bin 0 -> 285004 bytes
.../resources/libsigar-sparc64-solaris.so | Bin 0 -> 261896 bytes
.../resources/libsigar-universal-macosx.dylib | Bin 0 -> 377668 bytes
.../resources/libsigar-universal64-macosx.dylib | Bin 0 -> 397440 bytes
.../resources/libsigar-x86-freebsd-5.so | Bin 0 -> 179751 bytes
.../resources/libsigar-x86-freebsd-6.so | Bin 0 -> 179379 bytes
.../resources/resources/libsigar-x86-linux.so | Bin 0 -> 233385 bytes
.../resources/resources/libsigar-x86-solaris.so | Bin 0 -> 242880 bytes
.../resources/resources/sigar-amd64-winnt.dll | Bin 0 -> 402432 bytes
.../resources/resources/sigar-x86-winnt.dll | Bin 0 -> 266240 bytes
.../resources/resources/sigar-x86-winnt.lib | Bin 0 -> 99584 bytes
pom.xml | 7 +
.../src/clj/backtype/storm/daemon/nimbus.clj | 2 +-
.../metric/HttpForwardingMetricsConsumer.java | 81 ++++
.../metric/HttpForwardingMetricsServer.java | 119 +++++
32 files changed, 852 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d6d9b788/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
[5/5] storm git commit: Added STORM-1118 to Changelog
Posted by bo...@apache.org.
Added STORM-1118 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4b4bbd08
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4b4bbd08
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4b4bbd08
Branch: refs/heads/master
Commit: 4b4bbd08bbf881566f9e79d3158ad0e486b00c8f
Parents: d6d9b78
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Oct 20 09:51:02 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Oct 20 09:51:02 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4b4bbd08/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 15a554d..0308cf3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1118: Added test to compare latency vs. throughput in storm.
* STORM-1110: Fix Component Page for system components
* STORM-1093: Launching Workers with resources specified in resource-aware schedulers
* STORM-1102: Add a default flush interval for HiveBolt