You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/12 08:29:11 UTC
[03/10] storm git commit: STORM-2447: add in storm local to avoid
having server on worker classpath
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
index d8137b0..f541cb9 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
@@ -17,13 +17,28 @@
*/
package org.apache.storm.starter;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.HdrHistogram.Histogram;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
-import org.apache.storm.misc.metric.HttpForwardingMetricsServer;
-import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.SpoutStats;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
-import org.apache.storm.generated.*;
+import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
+import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
+import org.apache.storm.misc.metric.HttpForwardingMetricsServer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
@@ -34,395 +49,328 @@ import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.NimbusClient;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
-import org.HdrHistogram.Histogram;
-
/**
* WordCount but the spout goes at a predefined rate and we collect
* proper latency statistics.
*/
public class ThroughputVsLatency {
- private static class SentWithTime {
- public final String sentence;
- public final long time;
+ private static class SentWithTime {
+ public final String sentence;
+ public final long time;
- SentWithTime(String sentence, long time) {
- this.sentence = sentence;
- this.time = time;
- }
- }
-
- public static class C {
- LocalCluster _local = null;
- Nimbus.Client _client = null;
-
- public C(Map conf) throws Exception {
- Map clusterConf = Utils.readStormConfig();
- if (conf != null) {
- clusterConf.putAll(conf);
- }
- Boolean isLocal = (Boolean)clusterConf.get("run.local");
- if (isLocal != null && isLocal) {
- _local = new LocalCluster();
- } else {
- _client = NimbusClient.getConfiguredClient(clusterConf).getClient();
- }
+ SentWithTime(String sentence, long time) {
+ this.sentence = sentence;
+ this.time = time;
+ }
}
- public ClusterSummary getClusterInfo() throws Exception {
- if (_local != null) {
- return _local.getClusterInfo();
- } else {
- return _client.getClusterInfo();
- }
- }
+ public static class FastRandomSentenceSpout extends BaseRichSpout {
+ static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
+ "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
+
+ SpoutOutputCollector _collector;
+ long _periodNano;
+ long _emitAmount;
+ Random _rand;
+ long _nextEmitTime;
+ long _emitsLeft;
+ HistogramMetric _histo;
+
+ public FastRandomSentenceSpout(long ratePerSecond) {
+ if (ratePerSecond > 0) {
+ _periodNano = Math.max(1, 1000000000/ratePerSecond);
+ _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
+ } else {
+ _periodNano = Long.MAX_VALUE - 1;
+ _emitAmount = 1;
+ }
+ }
- public TopologyInfo getTopologyInfo(String id) throws Exception {
- if (_local != null) {
- return _local.getTopologyInfo(id);
- } else {
- return _client.getTopologyInfo(id);
- }
- }
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ _collector = collector;
+ _rand = ThreadLocalRandom.current();
+ _nextEmitTime = System.nanoTime();
+ _emitsLeft = _emitAmount;
+ _histo = new HistogramMetric(3600000000000L, 3);
+ context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
+ }
- public void killTopologyWithOpts(String name, KillOptions opts) throws Exception {
- if (_local != null) {
- _local.killTopologyWithOpts(name, opts);
- } else {
- _client.killTopologyWithOpts(name, opts);
- }
- }
+ @Override
+ public void nextTuple() {
+ if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
+ _emitsLeft = _emitAmount;
+ _nextEmitTime = _nextEmitTime + _periodNano;
+ }
- public void submitTopology(String name, Map stormConf, StormTopology topology) throws Exception {
- if (_local != null) {
- _local.submitTopology(name, stormConf, topology);
- } else {
- StormSubmitter.submitTopology(name, stormConf, topology);
- }
- }
+ if (_emitsLeft > 0) {
+ String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
+ _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
+ _emitsLeft--;
+ }
+ }
- public boolean isLocal() {
- return _local != null;
- }
- }
-
- public static class FastRandomSentenceSpout extends BaseRichSpout {
- static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
- "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
-
- SpoutOutputCollector _collector;
- long _periodNano;
- long _emitAmount;
- Random _rand;
- long _nextEmitTime;
- long _emitsLeft;
- HistogramMetric _histo;
-
- public FastRandomSentenceSpout(long ratePerSecond) {
- if (ratePerSecond > 0) {
- _periodNano = Math.max(1, 1000000000/ratePerSecond);
- _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
- } else {
- _periodNano = Long.MAX_VALUE - 1;
- _emitAmount = 1;
+ @Override
+ public void ack(Object id) {
+ long end = System.nanoTime();
+ SentWithTime st = (SentWithTime)id;
+ _histo.recordValue(end-st.time);
}
- }
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- _collector = collector;
- _rand = ThreadLocalRandom.current();
- _nextEmitTime = System.nanoTime();
- _emitsLeft = _emitAmount;
- _histo = new HistogramMetric(3600000000000L, 3);
- context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
- }
+ @Override
+ public void fail(Object id) {
+ SentWithTime st = (SentWithTime)id;
+ _collector.emit(new Values(st.sentence), id);
+ }
- @Override
- public void nextTuple() {
- if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
- _emitsLeft = _emitAmount;
- _nextEmitTime = _nextEmitTime + _periodNano;
- }
-
- if (_emitsLeft > 0) {
- String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
- _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
- _emitsLeft--;
- }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("sentence"));
+ }
}
- @Override
- public void ack(Object id) {
- long end = System.nanoTime();
- SentWithTime st = (SentWithTime)id;
- _histo.recordValue(end-st.time);
- }
+ public static class SplitSentence extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String sentence = tuple.getString(0);
+ for (String word: sentence.split("\\s+")) {
+ collector.emit(new Values(word, 1));
+ }
+ }
- @Override
- public void fail(Object id) {
- SentWithTime st = (SentWithTime)id;
- _collector.emit(new Values(st.sentence), id);
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
}
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("sentence"));
- }
- }
-
- public static class SplitSentence extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String sentence = tuple.getString(0);
- for (String word: sentence.split("\\s+")) {
- collector.emit(new Values(word, 1));
- }
- }
+ public static class WordCount extends BaseBasicBolt {
+ Map<String, Integer> counts = new HashMap<String, Integer>();
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
- }
-
- public static class WordCount extends BaseBasicBolt {
- Map<String, Integer> counts = new HashMap<String, Integer>();
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String word = tuple.getString(0);
- Integer count = counts.get(word);
- if (count == null)
- count = 0;
- count++;
- counts.put(word, count);
- collector.emit(new Values(word, count));
- }
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String word = tuple.getString(0);
+ Integer count = counts.get(word);
+ if (count == null)
+ count = 0;
+ count++;
+ counts.put(word, count);
+ collector.emit(new Values(word, count));
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
}
- }
- private static class MemMeasure {
- private long _mem = 0;
- private long _time = 0;
+ private static class MemMeasure {
+ private long _mem = 0;
+ private long _time = 0;
- public synchronized void update(long mem) {
- _mem = mem;
- _time = System.currentTimeMillis();
- }
+ public synchronized void update(long mem) {
+ _mem = mem;
+ _time = System.currentTimeMillis();
+ }
- public synchronized long get() {
- return isExpired() ? 0l : _mem;
- }
+ public synchronized long get() {
+ return isExpired() ? 0l : _mem;
+ }
- public synchronized boolean isExpired() {
- return (System.currentTimeMillis() - _time) >= 20000;
- }
- }
-
- private static final Histogram _histo = new Histogram(3600000000000L, 3);
- private static final AtomicLong _systemCPU = new AtomicLong(0);
- private static final AtomicLong _userCPU = new AtomicLong(0);
- private static final AtomicLong _gcCount = new AtomicLong(0);
- private static final AtomicLong _gcMs = new AtomicLong(0);
- private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
-
- private static long readMemory() {
- long total = 0;
- for (MemMeasure mem: _memoryBytes.values()) {
- total += mem.get();
- }
- return total;
- }
-
- private static long _prev_acked = 0;
- private static long _prev_uptime = 0;
-
- public static void printMetrics(C client, String name) throws Exception {
- ClusterSummary summary = client.getClusterInfo();
- String id = null;
- for (TopologySummary ts: summary.get_topologies()) {
- if (name.equals(ts.get_name())) {
- id = ts.get_id();
- }
+ public synchronized boolean isExpired() {
+ return (System.currentTimeMillis() - _time) >= 20000;
+ }
}
- if (id == null) {
- throw new Exception("Could not find a topology named "+name);
+
+ private static final Histogram _histo = new Histogram(3600000000000L, 3);
+ private static final AtomicLong _systemCPU = new AtomicLong(0);
+ private static final AtomicLong _userCPU = new AtomicLong(0);
+ private static final AtomicLong _gcCount = new AtomicLong(0);
+ private static final AtomicLong _gcMs = new AtomicLong(0);
+ private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
+
+ private static long readMemory() {
+ long total = 0;
+ for (MemMeasure mem: _memoryBytes.values()) {
+ total += mem.get();
+ }
+ return total;
}
- TopologyInfo info = client.getTopologyInfo(id);
- int uptime = info.get_uptime_secs();
- long acked = 0;
- long failed = 0;
- for (ExecutorSummary exec: info.get_executors()) {
- if ("spout".equals(exec.get_component_id()) && exec.get_stats() != null && exec.get_stats().get_specific() != null) {
- SpoutStats stats = exec.get_stats().get_specific().get_spout();
- Map<String, Long> failedMap = stats.get_failed().get(":all-time");
- Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
- if (ackedMap != null) {
- for (String key: ackedMap.keySet()) {
- if (failedMap != null) {
- Long tmp = failedMap.get(key);
- if (tmp != null) {
- failed += tmp;
- }
+
+ private static long _prev_acked = 0;
+ private static long _prev_uptime = 0;
+
+ public static void printMetrics(Nimbus.Iface client, String name) throws Exception {
+ ClusterSummary summary = client.getClusterInfo();
+ String id = null;
+ for (TopologySummary ts: summary.get_topologies()) {
+ if (name.equals(ts.get_name())) {
+ id = ts.get_id();
}
- long ackVal = ackedMap.get(key);
- acked += ackVal;
- }
}
- }
- }
- long ackedThisTime = acked - _prev_acked;
- long thisTime = uptime - _prev_uptime;
- long nnpct, nnnpct, min, max;
- double mean, stddev;
- synchronized(_histo) {
- nnpct = _histo.getValueAtPercentile(99.0);
- nnnpct = _histo.getValueAtPercentile(99.9);
- min = _histo.getMinValue();
- max = _histo.getMaxValue();
- mean = _histo.getMean();
- stddev = _histo.getStdDeviation();
- _histo.reset();
- }
- long user = _userCPU.getAndSet(0);
- long sys = _systemCPU.getAndSet(0);
- long gc = _gcMs.getAndSet(0);
- double memMB = readMemory() / (1024.0 * 1024.0);
- System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
- "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
- "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
- uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
- min, max, mean, stddev, user, sys, gc, memMB);
- _prev_uptime = uptime;
- _prev_acked = acked;
- }
-
- public static void kill(C client, String name) throws Exception {
- KillOptions opts = new KillOptions();
- opts.set_wait_secs(0);
- client.killTopologyWithOpts(name, opts);
- }
-
- public static void main(String[] args) throws Exception {
- long ratePerSecond = 500;
- if (args != null && args.length > 0) {
- ratePerSecond = Long.valueOf(args[0]);
+ if (id == null) {
+ throw new Exception("Could not find a topology named "+name);
+ }
+ TopologyInfo info = client.getTopologyInfo(id);
+ int uptime = info.get_uptime_secs();
+ long acked = 0;
+ long failed = 0;
+ for (ExecutorSummary exec: info.get_executors()) {
+ if ("spout".equals(exec.get_component_id()) && exec.get_stats() != null && exec.get_stats().get_specific() != null) {
+ SpoutStats stats = exec.get_stats().get_specific().get_spout();
+ Map<String, Long> failedMap = stats.get_failed().get(":all-time");
+ Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
+ if (ackedMap != null) {
+ for (String key: ackedMap.keySet()) {
+ if (failedMap != null) {
+ Long tmp = failedMap.get(key);
+ if (tmp != null) {
+ failed += tmp;
+ }
+ }
+ long ackVal = ackedMap.get(key);
+ acked += ackVal;
+ }
+ }
+ }
+ }
+ long ackedThisTime = acked - _prev_acked;
+ long thisTime = uptime - _prev_uptime;
+ long nnpct, nnnpct, min, max;
+ double mean, stddev;
+ synchronized(_histo) {
+ nnpct = _histo.getValueAtPercentile(99.0);
+ nnnpct = _histo.getValueAtPercentile(99.9);
+ min = _histo.getMinValue();
+ max = _histo.getMaxValue();
+ mean = _histo.getMean();
+ stddev = _histo.getStdDeviation();
+ _histo.reset();
+ }
+ long user = _userCPU.getAndSet(0);
+ long sys = _systemCPU.getAndSet(0);
+ long gc = _gcMs.getAndSet(0);
+ double memMB = readMemory() / (1024.0 * 1024.0);
+ System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
+ "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
+ "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
+ uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
+ min, max, mean, stddev, user, sys, gc, memMB);
+ _prev_uptime = uptime;
+ _prev_acked = acked;
}
- int parallelism = 4;
- if (args != null && args.length > 1) {
- parallelism = Integer.valueOf(args[1]);
+ public static void kill(Nimbus.Iface client, String name) throws Exception {
+ KillOptions opts = new KillOptions();
+ opts.set_wait_secs(0);
+ client.killTopologyWithOpts(name, opts);
}
- int numMins = 5;
- if (args != null && args.length > 2) {
- numMins = Integer.valueOf(args[2]);
- }
+ public static void main(String[] args) throws Exception {
+ long ratePerSecond = 500;
+ if (args != null && args.length > 0) {
+ ratePerSecond = Long.valueOf(args[0]);
+ }
- String name = "wc-test";
- if (args != null && args.length > 3) {
- name = args[3];
- }
+ int parallelism = 4;
+ if (args != null && args.length > 1) {
+ parallelism = Integer.valueOf(args[1]);
+ }
- Config conf = new Config();
- HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
- @Override
- public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
- String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
- for (DataPoint dp: dataPoints) {
- if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
- synchronized(_histo) {
- _histo.add((Histogram)dp.value);
+ int numMins = 5;
+ if (args != null && args.length > 2) {
+ numMins = Integer.valueOf(args[2]);
+ }
+
+ String name = "wc-test";
+ if (args != null && args.length > 3) {
+ name = args[3];
+ }
+
+ Config conf = new Config();
+ HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
+ @Override
+ public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+ String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
+ for (DataPoint dp: dataPoints) {
+ if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
+ synchronized(_histo) {
+ _histo.add((Histogram)dp.value);
+ }
+ } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
+ Map<Object, Object> m = (Map<Object, Object>)dp.value;
+ Object sys = m.get("sys-ms");
+ if (sys instanceof Number) {
+ _systemCPU.getAndAdd(((Number)sys).longValue());
+ }
+ Object user = m.get("user-ms");
+ if (user instanceof Number) {
+ _userCPU.getAndAdd(((Number)user).longValue());
+ }
+ } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
+ Map<Object, Object> m = (Map<Object, Object>)dp.value;
+ Object count = m.get("count");
+ if (count instanceof Number) {
+ _gcCount.getAndAdd(((Number)count).longValue());
+ }
+ Object time = m.get("timeMs");
+ if (time instanceof Number) {
+ _gcMs.getAndAdd(((Number)time).longValue());
+ }
+ } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
+ Map<Object, Object> m = (Map<Object, Object>)dp.value;
+ Object val = m.get("usedBytes");
+ if (val instanceof Number) {
+ MemMeasure mm = _memoryBytes.get(worker);
+ if (mm == null) {
+ mm = new MemMeasure();
+ MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
+ mm = tmp == null ? mm : tmp;
+ }
+ mm.update(((Number)val).longValue());
+ }
}
- } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
- Map<Object, Object> m = (Map<Object, Object>)dp.value;
- Object sys = m.get("sys-ms");
- if (sys instanceof Number) {
- _systemCPU.getAndAdd(((Number)sys).longValue());
- }
- Object user = m.get("user-ms");
- if (user instanceof Number) {
- _userCPU.getAndAdd(((Number)user).longValue());
- }
- } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
- Map<Object, Object> m = (Map<Object, Object>)dp.value;
- Object count = m.get("count");
- if (count instanceof Number) {
- _gcCount.getAndAdd(((Number)count).longValue());
- }
- Object time = m.get("timeMs");
- if (time instanceof Number) {
- _gcMs.getAndAdd(((Number)time).longValue());
- }
- } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
- Map<Object, Object> m = (Map<Object, Object>)dp.value;
- Object val = m.get("usedBytes");
- if (val instanceof Number) {
- MemMeasure mm = _memoryBytes.get(worker);
- if (mm == null) {
- mm = new MemMeasure();
- MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
- mm = tmp == null ? mm : tmp;
- }
- mm.update(((Number)val).longValue());
- }
}
}
+ };
+
+ metricServer.serve();
+ String url = metricServer.getUrl();
+
+ NimbusClient client = NimbusClient.getConfiguredClient(conf);
+ conf.setNumWorkers(parallelism);
+ conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
+ conf.registerMetricsConsumer(org.apache.storm.misc.metric.HttpForwardingMetricsConsumer.class, url, 1);
+ Map<String, String> workerMetrics = new HashMap<String, String>();
+ if (!NimbusClient.isLocalOverride()) {
+ //sigar uses JNI and does not work in local mode
+ workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
}
- };
-
- metricServer.serve();
- String url = metricServer.getUrl();
-
- C cluster = new C(conf);
- conf.setNumWorkers(parallelism);
- conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
- conf.registerMetricsConsumer(org.apache.storm.misc.metric.HttpForwardingMetricsConsumer.class, url, 1);
- Map<String, String> workerMetrics = new HashMap<String, String>();
- if (!cluster.isLocal()) {
- //sigar uses JNI and does not work in local mode
- workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
- }
- conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
- conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
- conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
- "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
- conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
+ conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
+ conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
+ conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
+ "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
+ conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
- TopologyBuilder builder = new TopologyBuilder();
+ TopologyBuilder builder = new TopologyBuilder();
- int numEach = 4 * parallelism;
- builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
+ int numEach = 4 * parallelism;
+ builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
- builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
- builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
+ builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
+ builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
- try {
- cluster.submitTopology(name, conf, builder.createTopology());
+ try {
+ StormSubmitter.submitTopology(name, conf, builder.createTopology());
- for (int i = 0; i < numMins * 2; i++) {
- Thread.sleep(30 * 1000);
- printMetrics(cluster, name);
+ for (int i = 0; i < numMins * 2; i++) {
+ Thread.sleep(30 * 1000);
+ printMetrics(client.getClient(), name);
+ }
+ } finally {
+ kill(client.getClient(), name);
}
- } finally {
- kill(cluster, name);
}
- System.exit(0);
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
index 5a310f6..c1e9555 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
@@ -17,9 +17,14 @@
*/
package org.apache.storm.starter;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.MemoryTransactionalSpout;
@@ -32,12 +37,7 @@ import org.apache.storm.transactional.TransactionalTopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.storm.utils.NimbusClient;
/**
* This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a
@@ -46,128 +46,129 @@ import java.util.Map;
* @see <a href="http://storm.apache.org/documentation/Transactional-topologies.html">Transactional topologies</a>
*/
public class TransactionalGlobalCount {
- public static final int PARTITION_TAKE_PER_BATCH = 3;
- public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
- put(0, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("chicken"));
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("apple"));
- }});
- put(1, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("apple"));
- add(new Values("banana"));
- }});
- put(2, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("dog"));
- add(new Values("dog"));
- add(new Values("dog"));
- }});
- }};
-
- public static class Value {
- int count = 0;
- BigInteger txid;
- }
-
- public static Map<String, Value> DATABASE = new HashMap<String, Value>();
- public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
-
- public static class BatchCount extends BaseBatchBolt {
- Object _id;
- BatchOutputCollector _collector;
-
- int _count = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
- _collector = collector;
- _id = id;
+ public static final int PARTITION_TAKE_PER_BATCH = 3;
+ public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
+ put(0, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("chicken"));
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("apple"));
+ }});
+ put(1, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("apple"));
+ add(new Values("banana"));
+ }});
+ put(2, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ }});
+ }};
+
+ public static class Value {
+ int count = 0;
+ BigInteger txid;
}
- @Override
- public void execute(Tuple tuple) {
- _count++;
- }
+ public static Map<String, Value> DATABASE = new HashMap<String, Value>();
+ public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
- @Override
- public void finishBatch() {
- _collector.emit(new Values(_id, _count));
- }
+ public static class BatchCount extends BaseBatchBolt {
+ Object _id;
+ BatchOutputCollector _collector;
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "count"));
- }
- }
+ int _count = 0;
- public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
- TransactionAttempt _attempt;
- BatchOutputCollector _collector;
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+ _collector = collector;
+ _id = id;
+ }
- int _sum = 0;
+ @Override
+ public void execute(Tuple tuple) {
+ _count++;
+ }
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
- _collector = collector;
- _attempt = attempt;
- }
+ @Override
+ public void finishBatch() {
+ _collector.emit(new Values(_id, _count));
+ }
- @Override
- public void execute(Tuple tuple) {
- _sum += tuple.getInteger(1);
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "count"));
+ }
}
- @Override
- public void finishBatch() {
- Value val = DATABASE.get(GLOBAL_COUNT_KEY);
- Value newval;
- if (val == null || !val.txid.equals(_attempt.getTransactionId())) {
- newval = new Value();
- newval.txid = _attempt.getTransactionId();
- if (val == null) {
- newval.count = _sum;
+ public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
+ TransactionAttempt _attempt;
+ BatchOutputCollector _collector;
+
+ int _sum = 0;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
+ _collector = collector;
+ _attempt = attempt;
}
- else {
- newval.count = _sum + val.count;
+
+ @Override
+ public void execute(Tuple tuple) {
+ _sum += tuple.getInteger(1);
}
- DATABASE.put(GLOBAL_COUNT_KEY, newval);
- }
- else {
- newval = val;
- }
- _collector.emit(new Values(_attempt, newval.count));
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "sum"));
+ @Override
+ public void finishBatch() {
+ Value val = DATABASE.get(GLOBAL_COUNT_KEY);
+ Value newval;
+ if (val == null || !val.txid.equals(_attempt.getTransactionId())) {
+ newval = new Value();
+ newval.txid = _attempt.getTransactionId();
+ if (val == null) {
+ newval.count = _sum;
+ }
+ else {
+ newval.count = _sum + val.count;
+ }
+ DATABASE.put(GLOBAL_COUNT_KEY, newval);
+ }
+ else {
+ newval = val;
+ }
+ _collector.emit(new Values(_attempt, newval.count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "sum"));
+ }
}
- }
-
- public static void main(String[] args) throws Exception {
- MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
- TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
- builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
- builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
-
- Config config = new Config();
- config.setDebug(true);
- config.setMaxSpoutPending(3);
-
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("global-count-topology", config, builder.buildTopology());) {
- Thread.sleep(3000);
+
+ public static void main(String[] args) throws Exception {
+ if (!NimbusClient.isLocalOverride()) {
+ throw new IllegalStateException("This example only works in local mode. "
+ + "Run with storm local not storm jar");
+ }
+ MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
+ TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
+ builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
+ builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
+
+ Config config = new Config();
+ config.setDebug(true);
+ config.setMaxSpoutPending(3);
+
+ StormSubmitter.submitTopology("global-count-topology", config, builder.buildTopology());
}
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
index 4965565..e922bab 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
@@ -17,9 +17,14 @@
*/
package org.apache.storm.starter;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.MemoryTransactionalSpout;
@@ -33,12 +38,7 @@ import org.apache.storm.transactional.TransactionalTopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.storm.utils.NimbusClient;
/**
* This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a
@@ -51,193 +51,194 @@ import java.util.Map;
* between buckets as their counts accumulate.
*/
public class TransactionalWords {
- public static class CountValue {
- Integer prev_count = null;
- int count = 0;
- BigInteger txid = null;
- }
-
- public static class BucketValue {
- int count = 0;
- BigInteger txid;
- }
-
- public static final int BUCKET_SIZE = 10;
-
- public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
- public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
-
-
- public static final int PARTITION_TAKE_PER_BATCH = 3;
-
- public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
- put(0, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("chicken"));
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("apple"));
- }});
- put(1, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("apple"));
- add(new Values("banana"));
- }});
- put(2, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("dog"));
- add(new Values("dog"));
- add(new Values("dog"));
- }});
- }};
-
- public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
- Map<String, Integer> _counts = new HashMap<String, Integer>();
- BatchOutputCollector _collector;
- TransactionAttempt _id;
-
- int _count = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
- _collector = collector;
- _id = id;
+ public static class CountValue {
+ Integer prev_count = null;
+ int count = 0;
+ BigInteger txid = null;
}
- @Override
- public void execute(Tuple tuple) {
- String key = tuple.getString(1);
- Integer curr = _counts.get(key);
- if (curr == null)
- curr = 0;
- _counts.put(key, curr + 1);
+ public static class BucketValue {
+ int count = 0;
+ BigInteger txid;
}
- @Override
- public void finishBatch() {
- for (String key : _counts.keySet()) {
- CountValue val = COUNT_DATABASE.get(key);
- CountValue newVal;
- if (val == null || !val.txid.equals(_id)) {
- newVal = new CountValue();
- newVal.txid = _id.getTransactionId();
- if (val != null) {
- newVal.prev_count = val.count;
- newVal.count = val.count;
- }
- newVal.count = newVal.count + _counts.get(key);
- COUNT_DATABASE.put(key, newVal);
+ public static final int BUCKET_SIZE = 10;
+
+ public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
+ public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
+
+
+ public static final int PARTITION_TAKE_PER_BATCH = 3;
+
+ public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
+ put(0, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("chicken"));
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("apple"));
+ }});
+ put(1, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("apple"));
+ add(new Values("banana"));
+ }});
+ put(2, new ArrayList<List<Object>>() {{
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("cat"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ add(new Values("dog"));
+ }});
+ }};
+
+ public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
+ Map<String, Integer> _counts = new HashMap<String, Integer>();
+ BatchOutputCollector _collector;
+ TransactionAttempt _id;
+
+ int _count = 0;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
+ _collector = collector;
+ _id = id;
}
- else {
- newVal = val;
+
+ @Override
+ public void execute(Tuple tuple) {
+ String key = tuple.getString(1);
+ Integer curr = _counts.get(key);
+ if (curr == null)
+ curr = 0;
+ _counts.put(key, curr + 1);
}
- _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
- }
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "key", "count", "prev-count"));
- }
- }
-
- public static class Bucketize extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
- int curr = tuple.getInteger(2);
- Integer prev = tuple.getInteger(3);
-
- int currBucket = curr / BUCKET_SIZE;
- Integer prevBucket = null;
- if (prev != null) {
- prevBucket = prev / BUCKET_SIZE;
- }
-
- if (prevBucket == null) {
- collector.emit(new Values(attempt, currBucket, 1));
- }
- else if (currBucket != prevBucket) {
- collector.emit(new Values(attempt, currBucket, 1));
- collector.emit(new Values(attempt, prevBucket, -1));
- }
+ @Override
+ public void finishBatch() {
+ for (String key : _counts.keySet()) {
+ CountValue val = COUNT_DATABASE.get(key);
+ CountValue newVal;
+ if (val == null || !val.txid.equals(_id)) {
+ newVal = new CountValue();
+ newVal.txid = _id.getTransactionId();
+ if (val != null) {
+ newVal.prev_count = val.count;
+ newVal.count = val.count;
+ }
+ newVal.count = newVal.count + _counts.get(key);
+ COUNT_DATABASE.put(key, newVal);
+ }
+ else {
+ newVal = val;
+ }
+ _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "key", "count", "prev-count"));
+ }
}
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("attempt", "bucket", "delta"));
+ public static class Bucketize extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
+ int curr = tuple.getInteger(2);
+ Integer prev = tuple.getInteger(3);
+
+ int currBucket = curr / BUCKET_SIZE;
+ Integer prevBucket = null;
+ if (prev != null) {
+ prevBucket = prev / BUCKET_SIZE;
+ }
+
+ if (prevBucket == null) {
+ collector.emit(new Values(attempt, currBucket, 1));
+ }
+ else if (currBucket != prevBucket) {
+ collector.emit(new Values(attempt, currBucket, 1));
+ collector.emit(new Values(attempt, prevBucket, -1));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("attempt", "bucket", "delta"));
+ }
}
- }
- public static class BucketCountUpdater extends BaseTransactionalBolt {
- Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
- BatchOutputCollector _collector;
- TransactionAttempt _attempt;
+ public static class BucketCountUpdater extends BaseTransactionalBolt {
+ Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
+ BatchOutputCollector _collector;
+ TransactionAttempt _attempt;
- int _count = 0;
+ int _count = 0;
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
- _collector = collector;
- _attempt = attempt;
- }
+ @Override
+ public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
+ _collector = collector;
+ _attempt = attempt;
+ }
- @Override
- public void execute(Tuple tuple) {
- Integer bucket = tuple.getInteger(1);
- Integer delta = tuple.getInteger(2);
- Integer curr = _accum.get(bucket);
- if (curr == null)
- curr = 0;
- _accum.put(bucket, curr + delta);
- }
+ @Override
+ public void execute(Tuple tuple) {
+ Integer bucket = tuple.getInteger(1);
+ Integer delta = tuple.getInteger(2);
+ Integer curr = _accum.get(bucket);
+ if (curr == null)
+ curr = 0;
+ _accum.put(bucket, curr + delta);
+ }
- @Override
- public void finishBatch() {
- for (Integer bucket : _accum.keySet()) {
- BucketValue currVal = BUCKET_DATABASE.get(bucket);
- BucketValue newVal;
- if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {
- newVal = new BucketValue();
- newVal.txid = _attempt.getTransactionId();
- newVal.count = _accum.get(bucket);
- if (currVal != null)
- newVal.count += currVal.count;
- BUCKET_DATABASE.put(bucket, newVal);
+ @Override
+ public void finishBatch() {
+ for (Integer bucket : _accum.keySet()) {
+ BucketValue currVal = BUCKET_DATABASE.get(bucket);
+ BucketValue newVal;
+ if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {
+ newVal = new BucketValue();
+ newVal.txid = _attempt.getTransactionId();
+ newVal.count = _accum.get(bucket);
+ if (currVal != null)
+ newVal.count += currVal.count;
+ BUCKET_DATABASE.put(bucket, newVal);
+ }
+ else {
+ newVal = currVal;
+ }
+ _collector.emit(new Values(_attempt, bucket, newVal.count));
+ }
}
- else {
- newVal = currVal;
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "bucket", "count"));
}
- _collector.emit(new Values(_attempt, bucket, newVal.count));
- }
}
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "bucket", "count"));
- }
- }
-
- public static void main(String[] args) throws Exception {
- MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
- TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
- builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));
- builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
- builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));
- Config config = new Config();
- config.setDebug(true);
- config.setMaxSpoutPending(3);
-
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("top-n-topology", config, builder.buildTopology());) {
- Thread.sleep(3000);
+ public static void main(String[] args) throws Exception {
+ if (!NimbusClient.isLocalOverride()) {
+ throw new IllegalStateException("This example only works in local mode. "
+ + "Run with storm local not storm jar");
+ }
+ MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
+ TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
+ builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));
+ builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
+ builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));
+ Config config = new Config();
+ config.setDebug(true);
+ config.setMaxSpoutPending(3);
+
+ StormSubmitter.submitTopology("top-n-topology", config, builder.buildTopology());
}
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index 576fcd9..59e3e4b 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -78,7 +78,7 @@ public class WordCountTopology extends ConfigurableTopology {
ConfigurableTopology.start(new WordCountTopology(), args);
}
- protected int run(String[] args) {
+ protected int run(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
@@ -91,17 +91,11 @@ public class WordCountTopology extends ConfigurableTopology {
String topologyName = "word-count";
- if (isLocal) {
- conf.setMaxTaskParallelism(3);
- ttl = 10;
- } else {
- conf.setNumWorkers(3);
- }
+ conf.setNumWorkers(3);
if (args != null && args.length > 0) {
topologyName = args[0];
}
-
return submit(topologyName, conf, builder);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
index 45014fc..39f5932 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopologyNode.java
@@ -17,41 +17,43 @@
*/
package org.apache.storm.starter;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.ShellSpout;
import org.apache.storm.task.ShellBolt;
-import org.apache.storm.topology.*;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* This topology demonstrates Storm's stream groupings and multilang capabilities.
*/
public class WordCountTopologyNode {
- public static class SplitSentence extends ShellBolt implements IRichBolt {
+ public static class SplitSentence extends ShellBolt implements IRichBolt {
- public SplitSentence() {
- super("node", "splitsentence.js");
- }
+ public SplitSentence() {
+ super("node", "splitsentence.js");
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
}
- }
public static class RandomSentence extends ShellSpout implements IRichSpout {
@@ -70,51 +72,42 @@ public class WordCountTopologyNode {
}
}
- public static class WordCount extends BaseBasicBolt {
- Map<String, Integer> counts = new HashMap<String, Integer>();
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String word = tuple.getString(0);
- Integer count = counts.get(word);
- if (count == null)
- count = 0;
- count++;
- counts.put(word, count);
- collector.emit(new Values(word, count));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
- }
-
- public static void main(String[] args) throws Exception {
+ public static class WordCount extends BaseBasicBolt {
+ Map<String, Integer> counts = new HashMap<String, Integer>();
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("spout", new RandomSentence(), 5);
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String word = tuple.getString(0);
+ Integer count = counts.get(word);
+ if (count == null)
+ count = 0;
+ count++;
+ counts.put(word, count);
+ collector.emit(new Values(word, count));
+ }
- builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
- builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+ }
- Config conf = new Config();
- conf.setDebug(true);
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
- if (args != null && args.length > 0) {
- conf.setNumWorkers(3);
+ builder.setSpout("spout", new RandomSentence(), 5);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- }
- else {
- conf.setMaxTaskParallelism(3);
+ builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
+ builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("word-count", conf, builder.createTopology());) {
- Thread.sleep(10000);
- }
+ Config conf = new Config();
+ conf.setDebug(true);
+ String topoName = "word-count";
+ if (args != null && args.length > 0) {
+ topoName = args[0];
+ }
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
}
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
index cbc5d45..dae774b 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java
@@ -18,7 +18,6 @@
package org.apache.storm.starter.streams;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.starter.spout.RandomIntegerSpout;
import org.apache.storm.streams.Pair;
@@ -27,7 +26,6 @@ import org.apache.storm.streams.operations.CombinerAggregator;
import org.apache.storm.streams.operations.mappers.ValueMapper;
import org.apache.storm.streams.windowing.TumblingWindows;
import org.apache.storm.topology.base.BaseWindowedBolt;
-import org.apache.storm.utils.Utils;
/**
* An example that illustrates the global aggregate
@@ -48,15 +46,12 @@ public class AggregateExample {
.print();
Config config = new Config();
+ String topoName = "AGG_EXAMPLE";
if (args.length > 0) {
- config.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
- Utils.sleep(60_000);
- }
+ topoName = args[0];
}
+ config.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
}
private static class Avg implements CombinerAggregator<Integer, Pair<Integer, Integer>, Double> {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
index 027b432..f8d865f 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
@@ -18,14 +18,12 @@
package org.apache.storm.starter.streams;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.starter.spout.RandomIntegerSpout;
import org.apache.storm.streams.Stream;
import org.apache.storm.streams.StreamBuilder;
import org.apache.storm.streams.operations.Predicate;
import org.apache.storm.streams.operations.mappers.ValueMapper;
-import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,15 +55,13 @@ public class BranchExample {
evenAndOdd[1].forEach(x -> LOG.info("ODD > " + x));
Config config = new Config();
+ String topoName = "branchExample";
if (args.length > 0) {
- config.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
- Utils.sleep(60_000);
- }
+ topoName = args[0];
}
+
+ config.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
index dd7e97f..d989433 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
@@ -17,8 +17,11 @@
*/
package org.apache.storm.starter.streams;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.streams.PairStream;
@@ -35,10 +38,6 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
/**
* An example that shows the usage of {@link PairStream#groupByKeyAndWindow(Window)}
* and {@link PairStream#reduceByKeyAndWindow(Reducer, Window)}
@@ -72,15 +71,12 @@ public class GroupByKeyAndWindowExample {
.print();
Config config = new Config();
+ String topoName = GroupByKeyAndWindowExample.class.getName();
if (args.length > 0) {
- config.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
- Utils.sleep(60_000);
- }
+ topoName = args[0];
}
+ config.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
}
private static class StockQuotes extends BaseRichSpout {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
index 4aa6253..ae8be3d 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
@@ -17,8 +17,9 @@
*/
package org.apache.storm.starter.streams;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.streams.PairStream;
@@ -29,14 +30,11 @@ import org.apache.storm.streams.windowing.TumblingWindows;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt.Duration;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
-import java.util.Map;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
-
/**
* An example that demonstrates the usage of {@link PairStream#join(PairStream)} to join
* multiple streams.
@@ -67,16 +65,12 @@ public class JoinExample {
.print();
Config config = new Config();
+ String topoName = JoinExample.class.getName();
if (args.length > 0) {
- config.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
- Utils.sleep(60_000);
- }
+ topoName = args[0];
}
-
+ config.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
}
private static class NumberSpout extends BaseRichSpout {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
index ab6cac3..d7eb16b 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
@@ -17,8 +17,9 @@
*/
package org.apache.storm.starter.streams;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.streams.Pair;
@@ -34,8 +35,6 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
-import java.util.Map;
-
/**
* An example that uses {@link Stream#stateQuery(StreamState)} to query the state
* <p>
@@ -79,16 +78,12 @@ public class StateQueryExample {
Config config = new Config();
// use redis based state store for persistence
config.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
-
+ String topoName = "test";
if (args.length > 0) {
- config.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
- Utils.sleep(60_000);
- }
+ topoName = args[0];
}
+ config.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
}
private static class QuerySpout extends BaseRichSpout {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
index ddd318a..aba19f3 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
@@ -18,7 +18,6 @@
package org.apache.storm.starter.streams;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.streams.Pair;
import org.apache.storm.streams.PairStream;
@@ -28,7 +27,6 @@ import org.apache.storm.streams.operations.mappers.ValueMapper;
import org.apache.storm.streams.windowing.TumblingWindows;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.base.BaseWindowedBolt;
-import org.apache.storm.utils.Utils;
/**
* A stateful word count that uses {@link PairStream#updateStateByKey(StateUpdater)} to
@@ -74,15 +72,11 @@ public class StatefulWordCount {
Config config = new Config();
// use redis based state store for persistence
config.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
-
+ String topoName = "test";
if (args.length > 0) {
- config.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
- Utils.sleep(60_000);
- }
+ topoName = args[0];
}
+ config.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
index 11e89bf..a74151b 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/TypedTupleExample.java
@@ -18,7 +18,6 @@
package org.apache.storm.starter.streams;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.starter.spout.RandomIntegerSpout;
import org.apache.storm.streams.Pair;
@@ -28,15 +27,12 @@ import org.apache.storm.streams.StreamBuilder;
import org.apache.storm.streams.operations.mappers.TupleValueMappers;
import org.apache.storm.streams.tuple.Tuple3;
import org.apache.storm.streams.windowing.TumblingWindows;
-import org.apache.storm.utils.Utils;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Count;
+import org.apache.storm.topology.base.BaseWindowedBolt.Count;
/**
* An example that illustrates the usage of typed tuples (TupleN<..>) and {@link TupleValueMappers}.
*/
public class TypedTupleExample {
- @SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {
StreamBuilder builder = new StreamBuilder();
/**
@@ -49,15 +45,12 @@ public class TypedTupleExample {
pairs.window(TumblingWindows.of(Count.of(10))).groupByKey().print();
- Config config = new Config();
+ String topoName = "test";
if (args.length > 0) {
- config.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
- Utils.sleep(60_000);
- }
+ topoName = args[0];
}
+ Config config = new Config();
+ config.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
index 0f30b7c..50c8aad 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
@@ -17,19 +17,16 @@
*/
package org.apache.storm.starter.streams;
+import java.util.Arrays;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.starter.spout.RandomSentenceSpout;
import org.apache.storm.streams.Pair;
import org.apache.storm.streams.StreamBuilder;
import org.apache.storm.streams.operations.mappers.ValueMapper;
import org.apache.storm.streams.windowing.TumblingWindows;
-import org.apache.storm.utils.Utils;
-
-import java.util.Arrays;
-
-import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
+import org.apache.storm.topology.base.BaseWindowedBolt.Duration;
/**
* A windowed word count example
@@ -66,14 +63,11 @@ public class WindowedWordCount {
.print();
Config config = new Config();
+ String topoName = "test";
if (args.length > 0) {
- config.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
- Utils.sleep(60_000);
- }
+ topoName = args[0];
}
+ config.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
index 1c0aae1..360f0ad 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
@@ -18,7 +18,6 @@
package org.apache.storm.starter.streams;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisPoolConfig;
@@ -30,7 +29,6 @@ import org.apache.storm.streams.operations.mappers.ValueMapper;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.tuple.ITuple;
-import org.apache.storm.utils.Utils;
/**
* An example that computes word counts and finally emits the results to an
@@ -67,15 +65,12 @@ public class WordCountToBolt {
.to(redisStoreBolt);
Config config = new Config();
+ String topoName = "test";
if (args.length > 0) {
- config.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
- } else {
- try (LocalCluster cluster = new LocalCluster();
- LocalCluster.LocalTopology topo = cluster.submitTopology("test", config, builder.build())) {
- Utils.sleep(60_000);
- }
+ topoName = args[0];
}
+ config.setNumWorkers(1);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.build());
}
// Maps a storm tuple to redis key and value
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
index 70a23b8..fa9274d 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -18,9 +18,9 @@
*/
package org.apache.storm.starter.trident;
+import java.util.HashMap;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
@@ -35,12 +35,9 @@ import org.apache.storm.trident.windowing.WindowsStoreFactory;
import org.apache.storm.trident.windowing.config.TumblingCountWindow;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-
/**
* Sample application of trident windowing which uses {@link HBaseWindowsStoreFactory}'s store for storing tuples in window.
*
@@ -76,17 +73,12 @@ public class TridentHBaseWindowingStoreTopology {
// window-state table should already be created with cf:tuples column
HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
-
- if (args.length == 0) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("wordCounterWithWindowing", conf, buildTopology(windowStoreFactory));) {
- Utils.sleep(120 * 1000);
- }
- System.exit(0);
- } else {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(windowStoreFactory));
+ String topoName = "wordCounterWithWindowing";
+ if (args.length > 0) {
+ topoName = args[0];
}
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, buildTopology(windowStoreFactory));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
index 5ddace8..c5f73ff 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMapExample.java
@@ -17,10 +17,10 @@
*/
package org.apache.storm.starter.trident;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.trident.TridentState;
@@ -39,9 +39,7 @@ import org.apache.storm.trident.testing.MemoryMapState;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.storm.utils.DRPCClient;
/**
* A simple example that demonstrates the usage of {@link org.apache.storm.trident.Stream#map(MapFunction)} and
@@ -74,7 +72,7 @@ public class TridentMapExample {
}
};
- public static StormTopology buildTopology(LocalDRPC drpc) {
+ public static StormTopology buildTopology() {
FixedBatchSpout spout = new FixedBatchSpout(
new Fields("word"), 3, new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
@@ -96,7 +94,7 @@ public class TridentMapExample {
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(16);
- topology.newDRPCStream("words", drpc)
+ topology.newDRPCStream("words")
.flatMap(split, new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
@@ -108,18 +106,17 @@ public class TridentMapExample {
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
- if (args.length == 0) {
- try (LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(drpc));) {
- for (int i = 0; i < 100; i++) {
- System.out.println("DRPC RESULT: " + drpc.execute("words", "CAT THE DOG JUMPED"));
- Thread.sleep(1000);
- }
+ String topoName = "wordCounter";
+ if (args.length > 0) {
+ topoName = args[0];
+ }
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, buildTopology());
+ try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
+ for (int i = 0; i < 10; i++) {
+ System.out.println("DRPC RESULT: " + drpc.execute("words", "CAT THE DOG JUMPED"));
+ Thread.sleep(1000);
}
- } else {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
index e5a775b..1cc33a9 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
@@ -17,9 +17,12 @@
*/
package org.apache.storm.starter.trident;
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.starter.spout.RandomNumberGeneratorSpout;
@@ -30,12 +33,6 @@ import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-import java.io.Serializable;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
/**
* This class demonstrates different usages of
@@ -113,16 +110,8 @@ public class TridentMinMaxOfDevicesTopology {
StormTopology topology = buildDevicesTopology();
Config conf = new Config();
conf.setMaxSpoutPending(20);
- if (args.length == 0) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("devices-topology", conf, topology);) {
- Utils.sleep(60 * 1000);
- }
- System.exit(0);
- } else {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar("devices-topology", conf, topology);
- }
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar("devices-topology", conf, topology);
}
static class SpeedComparator implements Comparator<TridentTuple>, Serializable {