You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:42 UTC
[46/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs
to org.apache.storm, but try to provide some form of backwards compatability
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
deleted file mode 100644
index 4c6680e..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.metric.HttpForwardingMetricsServer;
-import backtype.storm.metric.HttpForwardingMetricsConsumer;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IMetricsConsumer.TaskInfo;
-import backtype.storm.metric.api.IMetricsConsumer.DataPoint;
-import backtype.storm.generated.*;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-import backtype.storm.StormSubmitter;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
-import org.HdrHistogram.Histogram;
-
-/**
- * WordCount but the spout goes at a predefined rate and we collect
- * proper latency statistics.
- */
-public class ThroughputVsLatency {
- private static class SentWithTime {
- public final String sentence;
- public final long time;
-
- SentWithTime(String sentence, long time) {
- this.sentence = sentence;
- this.time = time;
- }
- }
-
- public static class C {
- LocalCluster _local = null;
- Nimbus.Client _client = null;
-
- public C(Map conf) {
- Map clusterConf = Utils.readStormConfig();
- if (conf != null) {
- clusterConf.putAll(conf);
- }
- Boolean isLocal = (Boolean)clusterConf.get("run.local");
- if (isLocal != null && isLocal) {
- _local = new LocalCluster();
- } else {
- _client = NimbusClient.getConfiguredClient(clusterConf).getClient();
- }
- }
-
- public ClusterSummary getClusterInfo() throws Exception {
- if (_local != null) {
- return _local.getClusterInfo();
- } else {
- return _client.getClusterInfo();
- }
- }
-
- public TopologyInfo getTopologyInfo(String id) throws Exception {
- if (_local != null) {
- return _local.getTopologyInfo(id);
- } else {
- return _client.getTopologyInfo(id);
- }
- }
-
- public void killTopologyWithOpts(String name, KillOptions opts) throws Exception {
- if (_local != null) {
- _local.killTopologyWithOpts(name, opts);
- } else {
- _client.killTopologyWithOpts(name, opts);
- }
- }
-
- public void submitTopology(String name, Map stormConf, StormTopology topology) throws Exception {
- if (_local != null) {
- _local.submitTopology(name, stormConf, topology);
- } else {
- StormSubmitter.submitTopology(name, stormConf, topology);
- }
- }
-
- public boolean isLocal() {
- return _local != null;
- }
- }
-
- public static class FastRandomSentenceSpout extends BaseRichSpout {
- static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
- "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
-
- SpoutOutputCollector _collector;
- long _periodNano;
- long _emitAmount;
- Random _rand;
- long _nextEmitTime;
- long _emitsLeft;
- HistogramMetric _histo;
-
- public FastRandomSentenceSpout(long ratePerSecond) {
- if (ratePerSecond > 0) {
- _periodNano = Math.max(1, 1000000000/ratePerSecond);
- _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
- } else {
- _periodNano = Long.MAX_VALUE - 1;
- _emitAmount = 1;
- }
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- _collector = collector;
- _rand = ThreadLocalRandom.current();
- _nextEmitTime = System.nanoTime();
- _emitsLeft = _emitAmount;
- _histo = new HistogramMetric(3600000000000L, 3);
- context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
- }
-
- @Override
- public void nextTuple() {
- if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
- _emitsLeft = _emitAmount;
- _nextEmitTime = _nextEmitTime + _periodNano;
- }
-
- if (_emitsLeft > 0) {
- String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
- _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
- _emitsLeft--;
- }
- }
-
- @Override
- public void ack(Object id) {
- long end = System.nanoTime();
- SentWithTime st = (SentWithTime)id;
- _histo.recordValue(end-st.time);
- }
-
- @Override
- public void fail(Object id) {
- SentWithTime st = (SentWithTime)id;
- _collector.emit(new Values(st.sentence), id);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("sentence"));
- }
- }
-
- public static class SplitSentence extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String sentence = tuple.getString(0);
- for (String word: sentence.split("\\s+")) {
- collector.emit(new Values(word, 1));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
- }
-
- public static class WordCount extends BaseBasicBolt {
- Map<String, Integer> counts = new HashMap<String, Integer>();
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String word = tuple.getString(0);
- Integer count = counts.get(word);
- if (count == null)
- count = 0;
- count++;
- counts.put(word, count);
- collector.emit(new Values(word, count));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
- }
-
- private static class MemMeasure {
- private long _mem = 0;
- private long _time = 0;
-
- public synchronized void update(long mem) {
- _mem = mem;
- _time = System.currentTimeMillis();
- }
-
- public synchronized long get() {
- return isExpired() ? 0l : _mem;
- }
-
- public synchronized boolean isExpired() {
- return (System.currentTimeMillis() - _time) >= 20000;
- }
- }
-
- private static final Histogram _histo = new Histogram(3600000000000L, 3);
- private static final AtomicLong _systemCPU = new AtomicLong(0);
- private static final AtomicLong _userCPU = new AtomicLong(0);
- private static final AtomicLong _gcCount = new AtomicLong(0);
- private static final AtomicLong _gcMs = new AtomicLong(0);
- private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
-
- private static long readMemory() {
- long total = 0;
- for (MemMeasure mem: _memoryBytes.values()) {
- total += mem.get();
- }
- return total;
- }
-
- private static long _prev_acked = 0;
- private static long _prev_uptime = 0;
-
- public static void printMetrics(C client, String name) throws Exception {
- ClusterSummary summary = client.getClusterInfo();
- String id = null;
- for (TopologySummary ts: summary.get_topologies()) {
- if (name.equals(ts.get_name())) {
- id = ts.get_id();
- }
- }
- if (id == null) {
- throw new Exception("Could not find a topology named "+name);
- }
- TopologyInfo info = client.getTopologyInfo(id);
- int uptime = info.get_uptime_secs();
- long acked = 0;
- long failed = 0;
- for (ExecutorSummary exec: info.get_executors()) {
- if ("spout".equals(exec.get_component_id())) {
- SpoutStats stats = exec.get_stats().get_specific().get_spout();
- Map<String, Long> failedMap = stats.get_failed().get(":all-time");
- Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
- if (ackedMap != null) {
- for (String key: ackedMap.keySet()) {
- if (failedMap != null) {
- Long tmp = failedMap.get(key);
- if (tmp != null) {
- failed += tmp;
- }
- }
- long ackVal = ackedMap.get(key);
- acked += ackVal;
- }
- }
- }
- }
- long ackedThisTime = acked - _prev_acked;
- long thisTime = uptime - _prev_uptime;
- long nnpct, nnnpct, min, max;
- double mean, stddev;
- synchronized(_histo) {
- nnpct = _histo.getValueAtPercentile(99.0);
- nnnpct = _histo.getValueAtPercentile(99.9);
- min = _histo.getMinValue();
- max = _histo.getMaxValue();
- mean = _histo.getMean();
- stddev = _histo.getStdDeviation();
- _histo.reset();
- }
- long user = _userCPU.getAndSet(0);
- long sys = _systemCPU.getAndSet(0);
- long gc = _gcMs.getAndSet(0);
- double memMB = readMemory() / (1024.0 * 1024.0);
- System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
- "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
- "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
- uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
- min, max, mean, stddev, user, sys, gc, memMB);
- _prev_uptime = uptime;
- _prev_acked = acked;
- }
-
- public static void kill(C client, String name) throws Exception {
- KillOptions opts = new KillOptions();
- opts.set_wait_secs(0);
- client.killTopologyWithOpts(name, opts);
- }
-
- public static void main(String[] args) throws Exception {
- long ratePerSecond = 500;
- if (args != null && args.length > 0) {
- ratePerSecond = Long.valueOf(args[0]);
- }
-
- int parallelism = 4;
- if (args != null && args.length > 1) {
- parallelism = Integer.valueOf(args[1]);
- }
-
- int numMins = 5;
- if (args != null && args.length > 2) {
- numMins = Integer.valueOf(args[2]);
- }
-
- String name = "wc-test";
- if (args != null && args.length > 3) {
- name = args[3];
- }
-
- Config conf = new Config();
- HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
- @Override
- public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
- String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
- for (DataPoint dp: dataPoints) {
- if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
- synchronized(_histo) {
- _histo.add((Histogram)dp.value);
- }
- } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
- Map<Object, Object> m = (Map<Object, Object>)dp.value;
- Object sys = m.get("sys-ms");
- if (sys instanceof Number) {
- _systemCPU.getAndAdd(((Number)sys).longValue());
- }
- Object user = m.get("user-ms");
- if (user instanceof Number) {
- _userCPU.getAndAdd(((Number)user).longValue());
- }
- } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
- Map<Object, Object> m = (Map<Object, Object>)dp.value;
- Object count = m.get("count");
- if (count instanceof Number) {
- _gcCount.getAndAdd(((Number)count).longValue());
- }
- Object time = m.get("timeMs");
- if (time instanceof Number) {
- _gcMs.getAndAdd(((Number)time).longValue());
- }
- } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
- Map<Object, Object> m = (Map<Object, Object>)dp.value;
- Object val = m.get("usedBytes");
- if (val instanceof Number) {
- MemMeasure mm = _memoryBytes.get(worker);
- if (mm == null) {
- mm = new MemMeasure();
- MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
- mm = tmp == null ? mm : tmp;
- }
- mm.update(((Number)val).longValue());
- }
- }
- }
- }
- };
-
- metricServer.serve();
- String url = metricServer.getUrl();
-
- C cluster = new C(conf);
- conf.setNumWorkers(parallelism);
- conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);
- conf.registerMetricsConsumer(backtype.storm.metric.HttpForwardingMetricsConsumer.class, url, 1);
- Map<String, String> workerMetrics = new HashMap<String, String>();
- if (!cluster.isLocal()) {
- //sigar uses JNI and does not work in local mode
- workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
- }
- conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
- conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
- conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
- "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
- conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
-
- TopologyBuilder builder = new TopologyBuilder();
-
- int numEach = 4 * parallelism;
- builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
-
- builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
- builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
-
- try {
- cluster.submitTopology(name, conf, builder.createTopology());
-
- for (int i = 0; i < numMins * 2; i++) {
- Thread.sleep(30 * 1000);
- printMetrics(cluster, name);
- }
- } finally {
- kill(cluster, name);
- }
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java b/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java
deleted file mode 100644
index 706afd1..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.MemoryTransactionalSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBatchBolt;
-import backtype.storm.topology.base.BaseTransactionalBolt;
-import backtype.storm.transactional.ICommitter;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.transactional.TransactionalTopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a
- * database. The source of data and the databases are mocked out as in memory maps for demonstration purposes.
- *
- * @see <a href="http://storm.apache.org/documentation/Transactional-topologies.html">Transactional topologies</a>
- */
-public class TransactionalGlobalCount {
- public static final int PARTITION_TAKE_PER_BATCH = 3;
- public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
- put(0, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("chicken"));
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("apple"));
- }});
- put(1, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("apple"));
- add(new Values("banana"));
- }});
- put(2, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("dog"));
- add(new Values("dog"));
- add(new Values("dog"));
- }});
- }};
-
- public static class Value {
- int count = 0;
- BigInteger txid;
- }
-
- public static Map<String, Value> DATABASE = new HashMap<String, Value>();
- public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
-
- public static class BatchCount extends BaseBatchBolt {
- Object _id;
- BatchOutputCollector _collector;
-
- int _count = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
- _collector = collector;
- _id = id;
- }
-
- @Override
- public void execute(Tuple tuple) {
- _count++;
- }
-
- @Override
- public void finishBatch() {
- _collector.emit(new Values(_id, _count));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "count"));
- }
- }
-
- public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
- TransactionAttempt _attempt;
- BatchOutputCollector _collector;
-
- int _sum = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
- _collector = collector;
- _attempt = attempt;
- }
-
- @Override
- public void execute(Tuple tuple) {
- _sum += tuple.getInteger(1);
- }
-
- @Override
- public void finishBatch() {
- Value val = DATABASE.get(GLOBAL_COUNT_KEY);
- Value newval;
- if (val == null || !val.txid.equals(_attempt.getTransactionId())) {
- newval = new Value();
- newval.txid = _attempt.getTransactionId();
- if (val == null) {
- newval.count = _sum;
- }
- else {
- newval.count = _sum + val.count;
- }
- DATABASE.put(GLOBAL_COUNT_KEY, newval);
- }
- else {
- newval = val;
- }
- _collector.emit(new Values(_attempt, newval.count));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "sum"));
- }
- }
-
- public static void main(String[] args) throws Exception {
- MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
- TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
- builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
- builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
-
- LocalCluster cluster = new LocalCluster();
-
- Config config = new Config();
- config.setDebug(true);
- config.setMaxSpoutPending(3);
-
- cluster.submitTopology("global-count-topology", config, builder.buildTopology());
-
- Thread.sleep(3000);
- cluster.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java b/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java
deleted file mode 100644
index 4d5ba1b..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.MemoryTransactionalSpout;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseTransactionalBolt;
-import backtype.storm.transactional.ICommitter;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.transactional.TransactionalTopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a
- * stream of words and produces two outputs:
- * <p/>
- * 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in
- * the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on.
- * <p/>
- * A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move
- * between buckets as their counts accumulate.
- */
-public class TransactionalWords {
- public static class CountValue {
- Integer prev_count = null;
- int count = 0;
- BigInteger txid = null;
- }
-
- public static class BucketValue {
- int count = 0;
- BigInteger txid;
- }
-
- public static final int BUCKET_SIZE = 10;
-
- public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
- public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
-
-
- public static final int PARTITION_TAKE_PER_BATCH = 3;
-
- public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
- put(0, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("chicken"));
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("apple"));
- }});
- put(1, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("apple"));
- add(new Values("banana"));
- }});
- put(2, new ArrayList<List<Object>>() {{
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("cat"));
- add(new Values("dog"));
- add(new Values("dog"));
- add(new Values("dog"));
- add(new Values("dog"));
- }});
- }};
-
- public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
- Map<String, Integer> _counts = new HashMap<String, Integer>();
- BatchOutputCollector _collector;
- TransactionAttempt _id;
-
- int _count = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
- _collector = collector;
- _id = id;
- }
-
- @Override
- public void execute(Tuple tuple) {
- String key = tuple.getString(1);
- Integer curr = _counts.get(key);
- if (curr == null)
- curr = 0;
- _counts.put(key, curr + 1);
- }
-
- @Override
- public void finishBatch() {
- for (String key : _counts.keySet()) {
- CountValue val = COUNT_DATABASE.get(key);
- CountValue newVal;
- if (val == null || !val.txid.equals(_id)) {
- newVal = new CountValue();
- newVal.txid = _id.getTransactionId();
- if (val != null) {
- newVal.prev_count = val.count;
- newVal.count = val.count;
- }
- newVal.count = newVal.count + _counts.get(key);
- COUNT_DATABASE.put(key, newVal);
- }
- else {
- newVal = val;
- }
- _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "key", "count", "prev-count"));
- }
- }
-
- public static class Bucketize extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
- int curr = tuple.getInteger(2);
- Integer prev = tuple.getInteger(3);
-
- int currBucket = curr / BUCKET_SIZE;
- Integer prevBucket = null;
- if (prev != null) {
- prevBucket = prev / BUCKET_SIZE;
- }
-
- if (prevBucket == null) {
- collector.emit(new Values(attempt, currBucket, 1));
- }
- else if (currBucket != prevBucket) {
- collector.emit(new Values(attempt, currBucket, 1));
- collector.emit(new Values(attempt, prevBucket, -1));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("attempt", "bucket", "delta"));
- }
- }
-
- public static class BucketCountUpdater extends BaseTransactionalBolt {
- Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
- BatchOutputCollector _collector;
- TransactionAttempt _attempt;
-
- int _count = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
- _collector = collector;
- _attempt = attempt;
- }
-
- @Override
- public void execute(Tuple tuple) {
- Integer bucket = tuple.getInteger(1);
- Integer delta = tuple.getInteger(2);
- Integer curr = _accum.get(bucket);
- if (curr == null)
- curr = 0;
- _accum.put(bucket, curr + delta);
- }
-
- @Override
- public void finishBatch() {
- for (Integer bucket : _accum.keySet()) {
- BucketValue currVal = BUCKET_DATABASE.get(bucket);
- BucketValue newVal;
- if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {
- newVal = new BucketValue();
- newVal.txid = _attempt.getTransactionId();
- newVal.count = _accum.get(bucket);
- if (currVal != null)
- newVal.count += currVal.count;
- BUCKET_DATABASE.put(bucket, newVal);
- }
- else {
- newVal = currVal;
- }
- _collector.emit(new Values(_attempt, bucket, newVal.count));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "bucket", "count"));
- }
- }
-
- public static void main(String[] args) throws Exception {
- MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
- TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
- builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));
- builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
- builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));
-
-
- LocalCluster cluster = new LocalCluster();
-
- Config config = new Config();
- config.setDebug(true);
- config.setMaxSpoutPending(3);
-
- cluster.submitTopology("top-n-topology", config, builder.buildTopology());
-
- Thread.sleep(3000);
- cluster.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
deleted file mode 100644
index 7260beb..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.task.ShellBolt;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import storm.starter.spout.RandomSentenceSpout;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This topology demonstrates Storm's stream groupings and multilang capabilities.
- */
-public class WordCountTopology {
- public static class SplitSentence extends ShellBolt implements IRichBolt {
-
- public SplitSentence() {
- super("python", "splitsentence.py");
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
- }
-
- public static class WordCount extends BaseBasicBolt {
- Map<String, Integer> counts = new HashMap<String, Integer>();
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String word = tuple.getString(0);
- Integer count = counts.get(word);
- if (count == null)
- count = 0;
- count++;
- counts.put(word, count);
- collector.emit(new Values(word, count));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
- }
-
- public static void main(String[] args) throws Exception {
-
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("spout", new RandomSentenceSpout(), 5);
-
- builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
- builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
-
- Config conf = new Config();
- conf.setDebug(true);
-
- if (args != null && args.length > 0) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- }
- else {
- conf.setMaxTaskParallelism(3);
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("word-count", conf, builder.createTopology());
-
- Thread.sleep(10000);
-
- cluster.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java
deleted file mode 100644
index 3fe982f..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.spout.ShellSpout;
-import backtype.storm.task.ShellBolt;
-import backtype.storm.topology.*;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This topology demonstrates Storm's stream groupings and multilang capabilities.
- */
-public class WordCountTopologyNode {
- public static class SplitSentence extends ShellBolt implements IRichBolt {
-
- public SplitSentence() {
- super("node", "splitsentence.js");
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
- }
-
- public static class RandomSentence extends ShellSpout implements IRichSpout {
-
- public RandomSentence() {
- super("node", "randomsentence.js");
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
- }
-
- public static class WordCount extends BaseBasicBolt {
- Map<String, Integer> counts = new HashMap<String, Integer>();
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String word = tuple.getString(0);
- Integer count = counts.get(word);
- if (count == null)
- count = 0;
- count++;
- counts.put(word, count);
- collector.emit(new Values(word, count));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
- }
-
- public static void main(String[] args) throws Exception {
-
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("spout", new RandomSentence(), 5);
-
- builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
- builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
-
- Config conf = new Config();
- conf.setDebug(true);
-
-
- if (args != null && args.length > 0) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- }
- else {
- conf.setMaxTaskParallelism(3);
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("word-count", conf, builder.createTopology());
-
- Thread.sleep(10000);
-
- cluster.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
deleted file mode 100644
index 64ceb29..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.Config;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.TupleUtils;
-import org.apache.log4j.Logger;
-import storm.starter.tools.Rankings;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This abstract bolt provides the basic behavior of bolts that rank objects according to their count.
- * <p/>
- * It uses a template method design pattern for {@link AbstractRankerBolt#execute(Tuple, BasicOutputCollector)} to allow
- * actual bolt implementations to specify how incoming tuples are processed, i.e. how the objects embedded within those
- * tuples are retrieved and counted.
- */
-public abstract class AbstractRankerBolt extends BaseBasicBolt {
-
- private static final long serialVersionUID = 4931640198501530202L;
- private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;
- private static final int DEFAULT_COUNT = 10;
-
- private final int emitFrequencyInSeconds;
- private final int count;
- private final Rankings rankings;
-
- public AbstractRankerBolt() {
- this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
- }
-
- public AbstractRankerBolt(int topN) {
- this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
- }
-
- public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
- if (topN < 1) {
- throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")");
- }
- if (emitFrequencyInSeconds < 1) {
- throw new IllegalArgumentException(
- "The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)");
- }
- count = topN;
- this.emitFrequencyInSeconds = emitFrequencyInSeconds;
- rankings = new Rankings(count);
- }
-
- protected Rankings getRankings() {
- return rankings;
- }
-
- /**
- * This method functions as a template method (design pattern).
- */
- @Override
- public final void execute(Tuple tuple, BasicOutputCollector collector) {
- if (TupleUtils.isTick(tuple)) {
- getLogger().debug("Received tick tuple, triggering emit of current rankings");
- emitRankings(collector);
- }
- else {
- updateRankingsWithTuple(tuple);
- }
- }
-
- abstract void updateRankingsWithTuple(Tuple tuple);
-
- private void emitRankings(BasicOutputCollector collector) {
- collector.emit(new Values(rankings.copy()));
- getLogger().debug("Rankings: " + rankings);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("rankings"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Map<String, Object> conf = new HashMap<String, Object>();
- conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
- return conf;
- }
-
- abstract Logger getLogger();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
deleted file mode 100644
index d1805ff..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.log4j.Logger;
-import storm.starter.tools.Rankable;
-import storm.starter.tools.RankableObjectWithFields;
-
-/**
- * This bolt ranks incoming objects by their count.
- * <p/>
- * It assumes the input tuples to adhere to the following format: (object, object_count, additionalField1,
- * additionalField2, ..., additionalFieldN).
- */
-public final class IntermediateRankingsBolt extends AbstractRankerBolt {
-
- private static final long serialVersionUID = -1369800530256637409L;
- private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class);
-
- public IntermediateRankingsBolt() {
- super();
- }
-
- public IntermediateRankingsBolt(int topN) {
- super(topN);
- }
-
- public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {
- super(topN, emitFrequencyInSeconds);
- }
-
- @Override
- void updateRankingsWithTuple(Tuple tuple) {
- Rankable rankable = RankableObjectWithFields.from(tuple);
- super.getRankings().updateWith(rankable);
- }
-
- @Override
- Logger getLogger() {
- return LOG;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java
deleted file mode 100644
index 58fc8ca..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Tuple;
-
-
-public class PrinterBolt extends BaseBasicBolt {
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- System.out.println(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer ofd) {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
deleted file mode 100644
index e222a97..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.Config;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import org.apache.log4j.Logger;
-import storm.starter.tools.NthLastModifiedTimeTracker;
-import storm.starter.tools.SlidingWindowCounter;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * This bolt aggregates counts from multiple upstream bolts.
- */
-public class RollingCountAggBolt extends BaseRichBolt {
- private static final long serialVersionUID = 5537727428628598519L;
- private static final Logger LOG = Logger.getLogger(RollingCountAggBolt.class);
- //Mapping of key->upstreamBolt->count
- private Map<Object, Map<Integer, Long>> counts = new HashMap<Object, Map<Integer, Long>>();
- private OutputCollector collector;
-
-
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- Object obj = tuple.getValue(0);
- long count = tuple.getLong(1);
- int source = tuple.getSourceTask();
- Map<Integer, Long> subCounts = counts.get(obj);
- if (subCounts == null) {
- subCounts = new HashMap<Integer, Long>();
- counts.put(obj, subCounts);
- }
- //Update the current count for this object
- subCounts.put(source, count);
- //Output the sum of all the known counts so for this key
- long sum = 0;
- for (Long val: subCounts.values()) {
- sum += val;
- }
- collector.emit(new Values(obj, sum));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("obj", "count"));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
deleted file mode 100644
index 31f7ee2..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.Config;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.TupleUtils;
-import org.apache.log4j.Logger;
-import storm.starter.tools.NthLastModifiedTimeTracker;
-import storm.starter.tools.SlidingWindowCounter;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * This bolt performs rolling counts of incoming objects, i.e. sliding window based counting.
- * <p/>
- * The bolt is configured by two parameters, the length of the sliding window in seconds (which influences the output
- * data of the bolt, i.e. how it will count objects) and the emit frequency in seconds (which influences how often the
- * bolt will output the latest window counts). For instance, if the window length is set to an equivalent of five
- * minutes and the emit frequency to one minute, then the bolt will output the latest five-minute sliding window every
- * minute.
- * <p/>
- * The bolt emits a rolling count tuple per object, consisting of the object itself, its latest rolling count, and the
- * actual duration of the sliding window. The latter is included in case the expected sliding window length (as
- * configured by the user) is different from the actual length, e.g. due to high system load. Note that the actual
- * window length is tracked and calculated for the window, and not individually for each object within a window.
- * <p/>
- * Note: During the startup phase you will usually observe that the bolt warns you about the actual sliding window
- * length being smaller than the expected length. This behavior is expected and is caused by the way the sliding window
- * counts are initially "loaded up". You can safely ignore this warning during startup (e.g. you will see this warning
- * during the first ~ five minutes of startup time if the window length is set to five minutes).
- */
-public class RollingCountBolt extends BaseRichBolt {
-
- private static final long serialVersionUID = 5537727428628598519L;
- private static final Logger LOG = Logger.getLogger(RollingCountBolt.class);
- private static final int NUM_WINDOW_CHUNKS = 5;
- private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;
- private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
- private static final String WINDOW_LENGTH_WARNING_TEMPLATE =
- "Actual window length is %d seconds when it should be %d seconds"
- + " (you can safely ignore this warning during the startup phase)";
-
- private final SlidingWindowCounter<Object> counter;
- private final int windowLengthInSeconds;
- private final int emitFrequencyInSeconds;
- private OutputCollector collector;
- private NthLastModifiedTimeTracker lastModifiedTracker;
-
- public RollingCountBolt() {
- this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
- }
-
- public RollingCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {
- this.windowLengthInSeconds = windowLengthInSeconds;
- this.emitFrequencyInSeconds = emitFrequencyInSeconds;
- counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
- this.emitFrequencyInSeconds));
- }
-
- private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {
- return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
- this.emitFrequencyInSeconds));
- }
-
- @Override
- public void execute(Tuple tuple) {
- if (TupleUtils.isTick(tuple)) {
- LOG.debug("Received tick tuple, triggering emit of current window counts");
- emitCurrentWindowCounts();
- }
- else {
- countObjAndAck(tuple);
- }
- }
-
- private void emitCurrentWindowCounts() {
- Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
- int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
- lastModifiedTracker.markAsModified();
- if (actualWindowLengthInSeconds != windowLengthInSeconds) {
- LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
- }
- emit(counts, actualWindowLengthInSeconds);
- }
-
- private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) {
- for (Entry<Object, Long> entry : counts.entrySet()) {
- Object obj = entry.getKey();
- Long count = entry.getValue();
- collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
- }
- }
-
- private void countObjAndAck(Tuple tuple) {
- Object obj = tuple.getValue(0);
- counter.incrementCount(obj);
- collector.ack(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Map<String, Object> conf = new HashMap<String, Object>();
- conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
- return conf;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java
deleted file mode 100644
index 85a7a26..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.Config;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TimeCacheMap;
-
-import java.util.*;
-
-public class SingleJoinBolt extends BaseRichBolt {
- OutputCollector _collector;
- Fields _idFields;
- Fields _outFields;
- int _numSources;
- TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending;
- Map<String, GlobalStreamId> _fieldLocations;
-
- public SingleJoinBolt(Fields outFields) {
- _outFields = outFields;
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _fieldLocations = new HashMap<String, GlobalStreamId>();
- _collector = collector;
- int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
- _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
- _numSources = context.getThisSources().size();
- Set<String> idFields = null;
- for (GlobalStreamId source : context.getThisSources().keySet()) {
- Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
- Set<String> setFields = new HashSet<String>(fields.toList());
- if (idFields == null)
- idFields = setFields;
- else
- idFields.retainAll(setFields);
-
- for (String outfield : _outFields) {
- for (String sourcefield : fields) {
- if (outfield.equals(sourcefield)) {
- _fieldLocations.put(outfield, source);
- }
- }
- }
- }
- _idFields = new Fields(new ArrayList<String>(idFields));
-
- if (_fieldLocations.size() != _outFields.size()) {
- throw new RuntimeException("Cannot find all outfields among sources");
- }
- }
-
- @Override
- public void execute(Tuple tuple) {
- List<Object> id = tuple.select(_idFields);
- GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
- if (!_pending.containsKey(id)) {
- _pending.put(id, new HashMap<GlobalStreamId, Tuple>());
- }
- Map<GlobalStreamId, Tuple> parts = _pending.get(id);
- if (parts.containsKey(streamId))
- throw new RuntimeException("Received same side of single join twice");
- parts.put(streamId, tuple);
- if (parts.size() == _numSources) {
- _pending.remove(id);
- List<Object> joinResult = new ArrayList<Object>();
- for (String outField : _outFields) {
- GlobalStreamId loc = _fieldLocations.get(outField);
- joinResult.add(parts.get(loc).getValueByField(outField));
- }
- _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
-
- for (Tuple part : parts.values()) {
- _collector.ack(part);
- }
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(_outFields);
- }
-
- private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
- @Override
- public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
- for (Tuple tuple : tuples.values()) {
- _collector.fail(tuple);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
deleted file mode 100644
index ef3a0b8..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/SlidingWindowSumBolt.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseWindowedBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.windowing.TupleWindow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Computes sliding window sum
- */
-public class SlidingWindowSumBolt extends BaseWindowedBolt {
- private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class);
-
- private int sum = 0;
- private OutputCollector collector;
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(TupleWindow inputWindow) {
- /*
- * The inputWindow gives a view of
- * (a) all the events in the window
- * (b) events that expired since last activation of the window
- * (c) events that newly arrived since last activation of the window
- */
- List<Tuple> tuplesInWindow = inputWindow.get();
- List<Tuple> newTuples = inputWindow.getNew();
- List<Tuple> expiredTuples = inputWindow.getExpired();
-
- LOG.debug("Events in current window: " + tuplesInWindow.size());
- /*
- * Instead of iterating over all the tuples in the window to compute
- * the sum, the values for the new events are added and old events are
- * subtracted. Similar optimizations might be possible in other
- * windowing computations.
- */
- for (Tuple tuple : newTuples) {
- sum += (int) tuple.getValue(0);
- }
- for (Tuple tuple : expiredTuples) {
- sum -= (int) tuple.getValue(0);
- }
- collector.emit(new Values(sum));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("sum"));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
deleted file mode 100644
index 0e1bb05..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.log4j.Logger;
-import storm.starter.tools.Rankings;
-
-/**
- * This bolt merges incoming {@link Rankings}.
- * <p/>
- * It can be used to merge intermediate rankings generated by {@link IntermediateRankingsBolt} into a final,
- * consolidated ranking. To do so, configure this bolt with a globalGrouping on {@link IntermediateRankingsBolt}.
- */
-public final class TotalRankingsBolt extends AbstractRankerBolt {
-
- private static final long serialVersionUID = -8447525895532302198L;
- private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class);
-
- public TotalRankingsBolt() {
- super();
- }
-
- public TotalRankingsBolt(int topN) {
- super(topN);
- }
-
- public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) {
- super(topN, emitFrequencyInSeconds);
- }
-
- @Override
- void updateRankingsWithTuple(Tuple tuple) {
- Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
- super.getRankings().updateWith(rankingsToBeMerged);
- super.getRankings().pruneZeroCounts();
- }
-
- @Override
- Logger getLogger() {
- return LOG;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java b/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
deleted file mode 100644
index 5778c8e..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/spout/RandomIntegerSpout.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.spout;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import java.util.Map;
-import java.util.Random;
-
-/**
- * Emits a random integer and a timestamp value (offset by one day),
- * every 100 ms. The ts field can be used in tuple time based windowing.
- */
-public class RandomIntegerSpout extends BaseRichSpout {
- private SpoutOutputCollector collector;
- private Random rand;
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("value", "ts"));
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.collector = collector;
- this.rand = new Random();
- }
-
- @Override
- public void nextTuple() {
- Utils.sleep(100);
- collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000)));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java b/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java
deleted file mode 100644
index 813b10c..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/spout/RandomSentenceSpout.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.spout;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import java.util.Map;
-import java.util.Random;
-
-public class RandomSentenceSpout extends BaseRichSpout {
- SpoutOutputCollector _collector;
- Random _rand;
-
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- _collector = collector;
- _rand = new Random();
- }
-
- @Override
- public void nextTuple() {
- Utils.sleep(100);
- String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
- "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
- String sentence = sentences[_rand.nextInt(sentences.length)];
- _collector.emit(new Values(sentence));
- }
-
- @Override
- public void ack(Object id) {
- }
-
- @Override
- public void fail(Object id) {
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java b/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java
deleted file mode 100644
index 40f8d72..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/spout/TwitterSampleSpout.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package storm.starter.spout;
-
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import twitter4j.FilterQuery;
-import twitter4j.StallWarning;
-import twitter4j.Status;
-import twitter4j.StatusDeletionNotice;
-import twitter4j.StatusListener;
-import twitter4j.TwitterStream;
-import twitter4j.TwitterStreamFactory;
-import twitter4j.auth.AccessToken;
-import twitter4j.conf.ConfigurationBuilder;
-
-import backtype.storm.Config;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-@SuppressWarnings("serial")
-public class TwitterSampleSpout extends BaseRichSpout {
-
- SpoutOutputCollector _collector;
- LinkedBlockingQueue<Status> queue = null;
- TwitterStream _twitterStream;
- String consumerKey;
- String consumerSecret;
- String accessToken;
- String accessTokenSecret;
- String[] keyWords;
-
- public TwitterSampleSpout(String consumerKey, String consumerSecret,
- String accessToken, String accessTokenSecret, String[] keyWords) {
- this.consumerKey = consumerKey;
- this.consumerSecret = consumerSecret;
- this.accessToken = accessToken;
- this.accessTokenSecret = accessTokenSecret;
- this.keyWords = keyWords;
- }
-
- public TwitterSampleSpout() {
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- queue = new LinkedBlockingQueue<Status>(1000);
- _collector = collector;
-
- StatusListener listener = new StatusListener() {
-
- @Override
- public void onStatus(Status status) {
-
- queue.offer(status);
- }
-
- @Override
- public void onDeletionNotice(StatusDeletionNotice sdn) {
- }
-
- @Override
- public void onTrackLimitationNotice(int i) {
- }
-
- @Override
- public void onScrubGeo(long l, long l1) {
- }
-
- @Override
- public void onException(Exception ex) {
- }
-
- @Override
- public void onStallWarning(StallWarning arg0) {
- // TODO Auto-generated method stub
-
- }
-
- };
-
- TwitterStream twitterStream = new TwitterStreamFactory(
- new ConfigurationBuilder().setJSONStoreEnabled(true).build())
- .getInstance();
-
- twitterStream.addListener(listener);
- twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
- AccessToken token = new AccessToken(accessToken, accessTokenSecret);
- twitterStream.setOAuthAccessToken(token);
-
- if (keyWords.length == 0) {
-
- twitterStream.sample();
- }
-
- else {
-
- FilterQuery query = new FilterQuery().track(keyWords);
- twitterStream.filter(query);
- }
-
- }
-
- @Override
- public void nextTuple() {
- Status ret = queue.poll();
- if (ret == null) {
- Utils.sleep(50);
- } else {
- _collector.emit(new Values(ret));
-
- }
- }
-
- @Override
- public void close() {
- _twitterStream.shutdown();
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Config ret = new Config();
- ret.setMaxTaskParallelism(1);
- return ret;
- }
-
- @Override
- public void ack(Object id) {
- }
-
- @Override
- public void fail(Object id) {
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("tweet"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java b/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
deleted file mode 100644
index 08df8cf..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/tools/NthLastModifiedTimeTracker.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-import backtype.storm.utils.Time;
-import org.apache.commons.collections.buffer.CircularFifoBuffer;
-
-/**
- * This class tracks the time-since-last-modify of a "thing" in a rolling fashion.
- * <p/>
- * For example, create a 5-slot tracker to track the five most recent time-since-last-modify.
- * <p/>
- * You must manually "mark" that the "something" that you want to track -- in terms of modification times -- has just
- * been modified.
- */
-public class NthLastModifiedTimeTracker {
-
- private static final int MILLIS_IN_SEC = 1000;
-
- private final CircularFifoBuffer lastModifiedTimesMillis;
-
- public NthLastModifiedTimeTracker(int numTimesToTrack) {
- if (numTimesToTrack < 1) {
- throw new IllegalArgumentException(
- "numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")");
- }
- lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);
- initLastModifiedTimesMillis();
- }
-
- private void initLastModifiedTimesMillis() {
- long nowCached = now();
- for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {
- lastModifiedTimesMillis.add(Long.valueOf(nowCached));
- }
- }
-
- private long now() {
- return Time.currentTimeMillis();
- }
-
- public int secondsSinceOldestModification() {
- long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue();
- return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);
- }
-
- public void markAsModified() {
- updateLastModifiedTime();
- }
-
- private void updateLastModifiedTime() {
- lastModifiedTimesMillis.add(now());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java b/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java
deleted file mode 100644
index 85e3d1d..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/tools/Rankable.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-public interface Rankable extends Comparable<Rankable> {
-
- Object getObject();
-
- long getCount();
-
- /**
- * Note: We do not defensively copy the object wrapped by the Rankable. It is passed as is.
- *
- * @return a defensive copy
- */
- Rankable copy();
-}