You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:43 UTC
[47/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs
to org.apache.storm, but try to provide some form of backwards compatability
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
deleted file mode 100644
index 250c418..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.blobstore.AtomicOutputStream;
-import backtype.storm.blobstore.ClientBlobStore;
-import backtype.storm.blobstore.InputStreamWithMeta;
-import backtype.storm.blobstore.NimbusBlobStore;
-
-import backtype.storm.generated.AccessControl;
-import backtype.storm.generated.AccessControlType;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.KeyAlreadyExistsException;
-import backtype.storm.generated.KeyNotFoundException;
-import backtype.storm.generated.SettableBlobMeta;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.ShellBolt;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.blobstore.BlobStoreAclHandler;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-public class BlobStoreAPIWordCountTopology {
- private static ClientBlobStore store; // Client API to invoke blob store API functionality
- private static String key = "key";
- private static String fileName = "blacklist.txt";
- private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
-
- public static void prepare() {
- Config conf = new Config();
- conf.putAll(Utils.readStormConfig());
- store = Utils.getClientBlobStore(conf);
- }
-
- // Spout implementation
- public static class RandomSentenceSpout extends BaseRichSpout {
- SpoutOutputCollector _collector;
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public void nextTuple() {
- Utils.sleep(100);
- _collector.emit(new Values(getRandomSentence()));
- }
-
- @Override
- public void ack(Object id) {
- }
-
- @Override
- public void fail(Object id) {
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("sentence"));
- }
-
- }
-
- // Bolt implementation
- public static class SplitSentence extends ShellBolt implements IRichBolt {
-
- public SplitSentence() {
- super("python", "splitsentence.py");
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
- }
-
- public static class FilterWords extends BaseBasicBolt {
- boolean poll = false;
- long pollTime;
- Set<String> wordSet;
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String word = tuple.getString(0);
- // Thread Polling every 5 seconds to update the wordSet seconds which is
- // used in FilterWords bolt to filter the words
- try {
- if (!poll) {
- wordSet = parseFile(fileName);
- pollTime = System.currentTimeMillis();
- poll = true;
- } else {
- if ((System.currentTimeMillis() - pollTime) > 5000) {
- wordSet = parseFile(fileName);
- pollTime = System.currentTimeMillis();
- }
- }
- } catch (IOException exp) {
- throw new RuntimeException(exp);
- }
- if (wordSet !=null && !wordSet.contains(word)) {
- collector.emit(new Values(word));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
- }
-
- public void buildAndLaunchWordCountTopology(String[] args) {
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("spout", new RandomSentenceSpout(), 5);
- builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
- builder.setBolt("filter", new FilterWords(), 6).shuffleGrouping("split");
-
- Config conf = new Config();
- conf.setDebug(true);
- try {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException exp) {
- throw new RuntimeException(exp);
- }
- }
-
- // Equivalent create command on command line
- // storm blobstore create --file blacklist.txt --acl o::rwa key
- private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
- throws AuthorizationException, KeyAlreadyExistsException, IOException,KeyNotFoundException {
- String stringBlobACL = "o::rwa";
- AccessControl blobACL = BlobStoreAclHandler.parseAccessControl(stringBlobACL);
- List<AccessControl> acls = new LinkedList<AccessControl>();
- acls.add(blobACL); // more ACLs can be added here
- SettableBlobMeta settableBlobMeta = new SettableBlobMeta(acls);
- AtomicOutputStream blobStream = clientBlobStore.createBlob(blobKey,settableBlobMeta);
- blobStream.write(readFile(file).toString().getBytes());
- blobStream.close();
- }
-
- // Equivalent update command on command line
- // storm blobstore update --file blacklist.txt key
- private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
- throws KeyNotFoundException, AuthorizationException, IOException {
- AtomicOutputStream blobOutputStream = clientBlobStore.updateBlob(blobKey);
- blobOutputStream.write(readFile(file).toString().getBytes());
- blobOutputStream.close();
- }
-
- private static String getRandomSentence() {
- String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
- "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
- String sentence = sentences[new Random().nextInt(sentences.length)];
- return sentence;
- }
-
- private static Set<String> getRandomWordSet() {
- Set<String> randomWordSet = new HashSet<>();
- Random random = new Random();
- String[] words = new String[]{ "cow", "jumped", "over", "the", "moon", "apple", "day", "doctor", "away",
- "four", "seven", "ago", "snow", "white", "seven", "dwarfs", "nature", "two" };
- // Choosing atmost 5 words to update the blacklist file for filtering
- for (int i=0; i<5; i++) {
- randomWordSet.add(words[random.nextInt(words.length)]);
- }
- return randomWordSet;
- }
-
- private static Set<String> parseFile(String fileName) throws IOException {
- File file = new File(fileName);
- Set<String> wordSet = new HashSet<>();
- if (!file.exists()) {
- return wordSet;
- }
- StringTokenizer tokens = new StringTokenizer(readFile(file).toString(), "\r\n");
- while (tokens.hasMoreElements()) {
- wordSet.add(tokens.nextToken());
- }
- LOG.debug("parseFile {}", wordSet);
- return wordSet;
- }
-
- private static StringBuilder readFile(File file) throws IOException {
- String line;
- StringBuilder fileContent = new StringBuilder();
- // Do not use canonical file name here as we are using
- // symbolic links to read file data and performing atomic move
- // while updating files
- BufferedReader br = new BufferedReader(new FileReader(file));
- while ((line = br.readLine()) != null) {
- fileContent.append(line);
- fileContent.append(System.lineSeparator());
- }
- return fileContent;
- }
-
- // Creating a blacklist file to read from the disk
- public static File createFile(String fileName) throws IOException {
- File file = null;
- file = new File(fileName);
- if (!file.exists()) {
- file.createNewFile();
- }
- writeToFile(file, getRandomWordSet());
- return file;
- }
-
- // Updating a blacklist file periodically with random words
- public static File updateFile(File file) throws IOException {
- writeToFile(file, getRandomWordSet());
- return file;
- }
-
- // Writing random words to be blacklisted
- public static void writeToFile(File file, Set<String> content) throws IOException{
- FileWriter fw = new FileWriter(file, false);
- BufferedWriter bw = new BufferedWriter(fw);
- Iterator<String> iter = content.iterator();
- while(iter.hasNext()) {
- bw.write(iter.next());
- bw.write(System.lineSeparator());
- }
- bw.close();
- }
-
- public static void main(String[] args) {
- prepare();
- BlobStoreAPIWordCountTopology wc = new BlobStoreAPIWordCountTopology();
- try {
- File file = createFile(fileName);
- // Creating blob again before launching topology
- createBlobWithContent(key, store, file);
-
- // Blostore launch command with topology blobstore map
- // Here we are giving it a local name so that we can read from the file
- // bin/storm jar examples/storm-starter/storm-starter-topologies-0.11.0-SNAPSHOT.jar
- // storm.starter.BlobStoreAPIWordCountTopology bl -c
- // topology.blobstore.map='{"key":{"localname":"blacklist.txt", "uncompress":"false"}}'
- wc.buildAndLaunchWordCountTopology(args);
-
- // Updating file few times every 5 seconds
- for(int i=0; i<10; i++) {
- updateBlobWithContent(key, store, updateFile(file));
- Utils.sleep(5000);
- }
- } catch (KeyAlreadyExistsException kae) {
- LOG.info("Key already exists {}", kae);
- } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
- throw new RuntimeException(exp);
- }
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
deleted file mode 100644
index d7b1b3e..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import java.util.Map;
-
-/**
- * This is a basic example of a Storm topology.
- */
-public class ExclamationTopology {
-
- public static class ExclamationBolt extends BaseRichBolt {
- OutputCollector _collector;
-
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
- _collector.ack(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
-
- }
-
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("word", new TestWordSpout(), 10);
- builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
- builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
-
- Config conf = new Config();
- conf.setDebug(true);
-
- if (args != null && args.length > 0) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- }
- else {
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(10000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java
deleted file mode 100644
index 8f78abd..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.*;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * WordCount but teh spout does not stop, and the bolts are implemented in
- * java. This can show how fast the word count can run.
- */
-public class FastWordCountTopology {
- public static class FastRandomSentenceSpout extends BaseRichSpout {
- SpoutOutputCollector _collector;
- Random _rand;
- private static final String[] CHOICES = {
- "marry had a little lamb whos fleese was white as snow",
- "and every where that marry went the lamb was sure to go",
- "one two three four five six seven eight nine ten",
- "this is a test of the emergency broadcast system this is only a test",
- "peter piper picked a peck of pickeled peppers"
- };
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- _collector = collector;
- _rand = ThreadLocalRandom.current();
- }
-
- @Override
- public void nextTuple() {
- String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
- _collector.emit(new Values(sentence), sentence);
- }
-
- @Override
- public void ack(Object id) {
- //Ignored
- }
-
- @Override
- public void fail(Object id) {
- _collector.emit(new Values(id), id);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("sentence"));
- }
- }
-
- public static class SplitSentence extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String sentence = tuple.getString(0);
- for (String word: sentence.split("\\s+")) {
- collector.emit(new Values(word, 1));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
- }
-
- public static class WordCount extends BaseBasicBolt {
- Map<String, Integer> counts = new HashMap<String, Integer>();
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String word = tuple.getString(0);
- Integer count = counts.get(word);
- if (count == null)
- count = 0;
- count++;
- counts.put(word, count);
- collector.emit(new Values(word, count));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
- }
-
- public static void printMetrics(Nimbus.Client client, String name) throws Exception {
- ClusterSummary summary = client.getClusterInfo();
- String id = null;
- for (TopologySummary ts: summary.get_topologies()) {
- if (name.equals(ts.get_name())) {
- id = ts.get_id();
- }
- }
- if (id == null) {
- throw new Exception("Could not find a topology named "+name);
- }
- TopologyInfo info = client.getTopologyInfo(id);
- int uptime = info.get_uptime_secs();
- long acked = 0;
- long failed = 0;
- double weightedAvgTotal = 0.0;
- for (ExecutorSummary exec: info.get_executors()) {
- if ("spout".equals(exec.get_component_id())) {
- SpoutStats stats = exec.get_stats().get_specific().get_spout();
- Map<String, Long> failedMap = stats.get_failed().get(":all-time");
- Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
- Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
- for (String key: ackedMap.keySet()) {
- if (failedMap != null) {
- Long tmp = failedMap.get(key);
- if (tmp != null) {
- failed += tmp;
- }
- }
- long ackVal = ackedMap.get(key);
- double latVal = avgLatMap.get(key) * ackVal;
- acked += ackVal;
- weightedAvgTotal += latVal;
- }
- }
- }
- double avgLatency = weightedAvgTotal/acked;
- System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
- }
-
- public static void kill(Nimbus.Client client, String name) throws Exception {
- KillOptions opts = new KillOptions();
- opts.set_wait_secs(0);
- client.killTopologyWithOpts(name, opts);
- }
-
- public static void main(String[] args) throws Exception {
-
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("spout", new FastRandomSentenceSpout(), 4);
-
- builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");
- builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word"));
-
- Config conf = new Config();
- conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);
-
- String name = "wc-test";
- if (args != null && args.length > 0) {
- name = args[0];
- }
-
- conf.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
-
- Map clusterConf = Utils.readStormConfig();
- clusterConf.putAll(Utils.readCommandLineOpts());
- Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
-
- //Sleep for 5 mins
- for (int i = 0; i < 10; i++) {
- Thread.sleep(30 * 1000);
- printMetrics(client, name);
- }
- kill(client, name);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java b/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java
deleted file mode 100644
index 5df0688..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.*;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-public class InOrderDeliveryTest {
- public static class InOrderSpout extends BaseRichSpout {
- SpoutOutputCollector _collector;
- int _base = 0;
- int _i = 0;
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- _collector = collector;
- _base = context.getThisTaskIndex();
- }
-
- @Override
- public void nextTuple() {
- Values v = new Values(_base, _i);
- _collector.emit(v, "ACK");
- _i++;
- }
-
- @Override
- public void ack(Object id) {
- //Ignored
- }
-
- @Override
- public void fail(Object id) {
- //Ignored
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("c1", "c2"));
- }
- }
-
- public static class Check extends BaseBasicBolt {
- Map<Integer, Integer> expected = new HashMap<Integer, Integer>();
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- Integer c1 = tuple.getInteger(0);
- Integer c2 = tuple.getInteger(1);
- Integer exp = expected.get(c1);
- if (exp == null) exp = 0;
- if (c2.intValue() != exp.intValue()) {
- System.out.println(c1+" "+c2+" != "+exp);
- throw new FailedException(c1+" "+c2+" != "+exp);
- }
- exp = c2 + 1;
- expected.put(c1, exp);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- //Empty
- }
- }
-
- public static void printMetrics(Nimbus.Client client, String name) throws Exception {
- ClusterSummary summary = client.getClusterInfo();
- String id = null;
- for (TopologySummary ts: summary.get_topologies()) {
- if (name.equals(ts.get_name())) {
- id = ts.get_id();
- }
- }
- if (id == null) {
- throw new Exception("Could not find a topology named "+name);
- }
- TopologyInfo info = client.getTopologyInfo(id);
- int uptime = info.get_uptime_secs();
- long acked = 0;
- long failed = 0;
- double weightedAvgTotal = 0.0;
- for (ExecutorSummary exec: info.get_executors()) {
- if ("spout".equals(exec.get_component_id())) {
- SpoutStats stats = exec.get_stats().get_specific().get_spout();
- Map<String, Long> failedMap = stats.get_failed().get(":all-time");
- Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
- Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
- for (String key: ackedMap.keySet()) {
- if (failedMap != null) {
- Long tmp = failedMap.get(key);
- if (tmp != null) {
- failed += tmp;
- }
- }
- long ackVal = ackedMap.get(key);
- double latVal = avgLatMap.get(key) * ackVal;
- acked += ackVal;
- weightedAvgTotal += latVal;
- }
- }
- }
- double avgLatency = weightedAvgTotal/acked;
- System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
- }
-
- public static void kill(Nimbus.Client client, String name) throws Exception {
- KillOptions opts = new KillOptions();
- opts.set_wait_secs(0);
- client.killTopologyWithOpts(name, opts);
- }
-
- public static void main(String[] args) throws Exception {
-
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("spout", new InOrderSpout(), 8);
- builder.setBolt("count", new Check(), 8).fieldsGrouping("spout", new Fields("c1"));
-
- Config conf = new Config();
- conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);
-
- String name = "in-order-test";
- if (args != null && args.length > 0) {
- name = args[0];
- }
-
- conf.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
-
- Map clusterConf = Utils.readStormConfig();
- clusterConf.putAll(Utils.readCommandLineOpts());
- Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
-
- //Sleep for 50 mins
- for (int i = 0; i < 50; i++) {
- Thread.sleep(30 * 1000);
- printMetrics(client, name);
- }
- kill(client, name);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java b/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java
deleted file mode 100644
index fe0bae2..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.LocalDRPC;
-import backtype.storm.drpc.DRPCSpout;
-import backtype.storm.drpc.ReturnResults;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-
-public class ManualDRPC {
- public static class ExclamationBolt extends BaseBasicBolt {
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("result", "return-info"));
- }
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String arg = tuple.getString(0);
- Object retInfo = tuple.getValue(1);
- collector.emit(new Values(arg + "!!!", retInfo));
- }
-
- }
-
- public static void main(String[] args) {
- TopologyBuilder builder = new TopologyBuilder();
- LocalDRPC drpc = new LocalDRPC();
-
- DRPCSpout spout = new DRPCSpout("exclamation", drpc);
- builder.setSpout("drpc", spout);
- builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
- builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
-
- LocalCluster cluster = new LocalCluster();
- Config conf = new Config();
- cluster.submitTopology("exclaim", conf, builder.createTopology());
-
- System.out.println(drpc.execute("exclamation", "aaa"));
- System.out.println(drpc.execute("exclamation", "bbb"));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/MultipleLoggerTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/MultipleLoggerTopology.java b/examples/storm-starter/src/jvm/storm/starter/MultipleLoggerTopology.java
deleted file mode 100644
index 4285ff9..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/MultipleLoggerTopology.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * This is a basic example of a Storm topology.
- */
-public class MultipleLoggerTopology {
- public static class ExclamationLoggingBolt extends BaseRichBolt {
- OutputCollector _collector;
- Logger _rootLogger = LoggerFactory.getLogger (Logger.ROOT_LOGGER_NAME);
- // ensure the loggers are configured in the worker.xml before
- // trying to use them here
- Logger _logger = LoggerFactory.getLogger ("com.myapp");
- Logger _subLogger = LoggerFactory.getLogger ("com.myapp.sub");
-
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- _rootLogger.debug ("root: This is a DEBUG message");
- _rootLogger.info ("root: This is an INFO message");
- _rootLogger.warn ("root: This is a WARN message");
- _rootLogger.error ("root: This is an ERROR message");
-
- _logger.debug ("myapp: This is a DEBUG message");
- _logger.info ("myapp: This is an INFO message");
- _logger.warn ("myapp: This is a WARN message");
- _logger.error ("myapp: This is an ERROR message");
-
- _subLogger.debug ("myapp.sub: This is a DEBUG message");
- _subLogger.info ("myapp.sub: This is an INFO message");
- _subLogger.warn ("myapp.sub: This is a WARN message");
- _subLogger.error ("myapp.sub: This is an ERROR message");
-
- _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
- _collector.ack(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
- }
-
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("word", new TestWordSpout(), 10);
- builder.setBolt("exclaim1", new ExclamationLoggingBolt(), 3).shuffleGrouping("word");
- builder.setBolt("exclaim2", new ExclamationLoggingBolt(), 2).shuffleGrouping("exclaim1");
-
- Config conf = new Config();
- conf.setDebug(true);
-
- if (args != null && args.length > 0) {
- conf.setNumWorkers(2);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- } else {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(10000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java b/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java
deleted file mode 100644
index 021cc17..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package storm.starter;
-
-import java.util.Arrays;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.utils.Utils;
-
-import storm.starter.bolt.PrinterBolt;
-import storm.starter.spout.TwitterSampleSpout;
-
-public class PrintSampleStream {
- public static void main(String[] args) {
- String consumerKey = args[0];
- String consumerSecret = args[1];
- String accessToken = args[2];
- String accessTokenSecret = args[3];
- String[] arguments = args.clone();
- String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
-
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("twitter", new TwitterSampleSpout(consumerKey, consumerSecret,
- accessToken, accessTokenSecret, keyWords));
- builder.setBolt("print", new PrinterBolt())
- .shuffleGrouping("twitter");
-
-
- Config conf = new Config();
-
-
- LocalCluster cluster = new LocalCluster();
-
- cluster.submitTopology("test", conf, builder.createTopology());
-
- Utils.sleep(10000);
- cluster.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
deleted file mode 100644
index 73ed45a..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.LocalDRPC;
-import backtype.storm.StormSubmitter;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.drpc.LinearDRPCTopologyBuilder;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseBatchBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.*;
-
-/**
- * This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can
- * compute the reach for any URL on Twitter in realtime by parallelizing the whole computation.
- * <p/>
- * Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people
- * who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the
- * unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower
- * records.
- * <p/>
- * This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes
- * minutes on a single machine into one that takes just a couple seconds.
- * <p/>
- * For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps.
- *
- * @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a>
- */
-public class ReachTopology {
- public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
- put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
- put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
- put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
- }};
-
- public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
- put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
- put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
- put("tim", Arrays.asList("alex"));
- put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
- put("adam", Arrays.asList("david", "carissa"));
- put("mike", Arrays.asList("john", "bob"));
- put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
- }};
-
- public static class GetTweeters extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- Object id = tuple.getValue(0);
- String url = tuple.getString(1);
- List<String> tweeters = TWEETERS_DB.get(url);
- if (tweeters != null) {
- for (String tweeter : tweeters) {
- collector.emit(new Values(id, tweeter));
- }
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "tweeter"));
- }
- }
-
- public static class GetFollowers extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- Object id = tuple.getValue(0);
- String tweeter = tuple.getString(1);
- List<String> followers = FOLLOWERS_DB.get(tweeter);
- if (followers != null) {
- for (String follower : followers) {
- collector.emit(new Values(id, follower));
- }
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "follower"));
- }
- }
-
- public static class PartialUniquer extends BaseBatchBolt {
- BatchOutputCollector _collector;
- Object _id;
- Set<String> _followers = new HashSet<String>();
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
- _collector = collector;
- _id = id;
- }
-
- @Override
- public void execute(Tuple tuple) {
- _followers.add(tuple.getString(1));
- }
-
- @Override
- public void finishBatch() {
- _collector.emit(new Values(_id, _followers.size()));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "partial-count"));
- }
- }
-
- public static class CountAggregator extends BaseBatchBolt {
- BatchOutputCollector _collector;
- Object _id;
- int _count = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
- _collector = collector;
- _id = id;
- }
-
- @Override
- public void execute(Tuple tuple) {
- _count += tuple.getInteger(1);
- }
-
- @Override
- public void finishBatch() {
- _collector.emit(new Values(_id, _count));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "reach"));
- }
- }
-
- public static LinearDRPCTopologyBuilder construct() {
- LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
- builder.addBolt(new GetTweeters(), 4);
- builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
- builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
- builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
- return builder;
- }
-
- public static void main(String[] args) throws Exception {
- LinearDRPCTopologyBuilder builder = construct();
-
-
- Config conf = new Config();
-
- if (args == null || args.length == 0) {
- conf.setMaxTaskParallelism(3);
- LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));
-
- String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
- for (String url : urlsToTry) {
- System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
- }
-
- cluster.shutdown();
- drpc.shutdown();
- }
- else {
- conf.setNumWorkers(6);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
deleted file mode 100644
index 0fb3724..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import java.util.Map;
-
-public class ResourceAwareExampleTopology {
- public static class ExclamationBolt extends BaseRichBolt {
- OutputCollector _collector;
-
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
- _collector.ack(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
- }
-
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
-
- SpoutDeclarer spout = builder.setSpout("word", new TestWordSpout(), 10);
- //set cpu requirement
- spout.setCPULoad(20);
- //set onheap and offheap memory requirement
- spout.setMemoryLoad(64, 16);
-
- BoltDeclarer bolt1 = builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
- //sets cpu requirement. Not neccessary to set both CPU and memory.
- //For requirements not set, a default value will be used
- bolt1.setCPULoad(15);
-
- BoltDeclarer bolt2 = builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
- bolt2.setMemoryLoad(100);
-
- Config conf = new Config();
- conf.setDebug(true);
-
- /**
- * Use to limit the maximum amount of memory (in MB) allocated to one worker process.
- * Can be used to spread executors to to multiple workers
- */
- conf.setTopologyWorkerMaxHeapSize(1024.0);
-
- //topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases).
- //Recommended range of 0-29 but no hard limit set.
- conf.setTopologyPriority(29);
-
- // Set strategy to schedule topology. If not specified, default to backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
- conf.setTopologyStrategy(backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);
-
- if (args != null && args.length > 0) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- }
- else {
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(10000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java b/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java
deleted file mode 100644
index 762c22a..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import org.apache.log4j.Logger;
-import storm.starter.bolt.IntermediateRankingsBolt;
-import storm.starter.bolt.RollingCountBolt;
-import storm.starter.bolt.TotalRankingsBolt;
-import storm.starter.util.StormRunner;
-
-/**
- * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.
- * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things
- * like trending topics or trending images on Twitter.
- */
-public class RollingTopWords {
-
- private static final Logger LOG = Logger.getLogger(RollingTopWords.class);
- private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
- private static final int TOP_N = 5;
-
- private final TopologyBuilder builder;
- private final String topologyName;
- private final Config topologyConfig;
- private final int runtimeInSeconds;
-
- public RollingTopWords(String topologyName) throws InterruptedException {
- builder = new TopologyBuilder();
- this.topologyName = topologyName;
- topologyConfig = createTopologyConfiguration();
- runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
-
- wireTopology();
- }
-
- private static Config createTopologyConfiguration() {
- Config conf = new Config();
- conf.setDebug(true);
- return conf;
- }
-
- private void wireTopology() throws InterruptedException {
- String spoutId = "wordGenerator";
- String counterId = "counter";
- String intermediateRankerId = "intermediateRanker";
- String totalRankerId = "finalRanker";
- builder.setSpout(spoutId, new TestWordSpout(), 5);
- builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
- builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields(
- "obj"));
- builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
- }
-
- public void runLocally() throws InterruptedException {
- StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
- }
-
- public void runRemotely() throws Exception {
- StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig);
- }
-
- /**
- * Submits (runs) the topology.
- *
- * Usage: "RollingTopWords [topology-name] [local|remote]"
- *
- * By default, the topology is run locally under the name "slidingWindowCounts".
- *
- * Examples:
- *
- * ```
- *
- * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
- * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords
- *
- * # Runs in local mode (LocalCluster), with topology name "foobar"
- * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar
- *
- * # Runs in local mode (LocalCluster), with topology name "foobar"
- * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar local
- *
- * # Runs in remote/cluster mode, with topology name "production-topology"
- * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords production-topology remote
- * ```
- *
- * @param args First positional argument (optional) is topology name, second positional argument (optional) defines
- * whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote").
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- String topologyName = "slidingWindowCounts";
- if (args.length >= 1) {
- topologyName = args[0];
- }
- boolean runLocally = true;
- if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) {
- runLocally = false;
- }
-
- LOG.info("Topology name: " + topologyName);
- RollingTopWords rtw = new RollingTopWords(topologyName);
- if (runLocally) {
- LOG.info("Running in local mode");
- rtw.runLocally();
- }
- else {
- LOG.info("Running in remote (cluster) mode");
- rtw.runRemotely();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java b/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java
deleted file mode 100644
index cb1d98c..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.testing.FeederSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import storm.starter.bolt.SingleJoinBolt;
-
-public class SingleJoinExample {
- public static void main(String[] args) {
- FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
- FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("gender", genderSpout);
- builder.setSpout("age", ageSpout);
- builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
- .fieldsGrouping("age", new Fields("id"));
-
- Config conf = new Config();
- conf.setDebug(true);
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("join-example", conf, builder.createTopology());
-
- for (int i = 0; i < 10; i++) {
- String gender;
- if (i % 2 == 0) {
- gender = "male";
- }
- else {
- gender = "female";
- }
- genderSpout.feed(new Values(i, gender));
- }
-
- for (int i = 9; i >= 0; i--) {
- ageSpout.feed(new Values(i, i + 20));
- }
-
- Utils.sleep(2000);
- cluster.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java b/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java
deleted file mode 100644
index 443c051..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import org.apache.log4j.Logger;
-import storm.starter.bolt.IntermediateRankingsBolt;
-import storm.starter.bolt.RollingCountBolt;
-import storm.starter.bolt.RollingCountAggBolt;
-import storm.starter.bolt.TotalRankingsBolt;
-import storm.starter.util.StormRunner;
-
-/**
- * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.
- * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things
- * like trending topics or trending images on Twitter. It takes an approach that assumes that some works will be much
- * more common then other words, and uses partialKeyGrouping to better balance the skewed load.
- */
-public class SkewedRollingTopWords {
- private static final Logger LOG = Logger.getLogger(SkewedRollingTopWords.class);
- private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
- private static final int TOP_N = 5;
-
- private final TopologyBuilder builder;
- private final String topologyName;
- private final Config topologyConfig;
- private final int runtimeInSeconds;
-
- public SkewedRollingTopWords(String topologyName) throws InterruptedException {
- builder = new TopologyBuilder();
- this.topologyName = topologyName;
- topologyConfig = createTopologyConfiguration();
- runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
-
- wireTopology();
- }
-
- private static Config createTopologyConfiguration() {
- Config conf = new Config();
- conf.setDebug(true);
- return conf;
- }
-
- private void wireTopology() throws InterruptedException {
- String spoutId = "wordGenerator";
- String counterId = "counter";
- String aggId = "aggregator";
- String intermediateRankerId = "intermediateRanker";
- String totalRankerId = "finalRanker";
- builder.setSpout(spoutId, new TestWordSpout(), 5);
- builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word"));
- builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj"));
- builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj"));
- builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
- }
-
- public void runLocally() throws InterruptedException {
- StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
- }
-
- public void runRemotely() throws Exception {
- StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig);
- }
-
- /**
- * Submits (runs) the topology.
- *
- * Usage: "RollingTopWords [topology-name] [local|remote]"
- *
- * By default, the topology is run locally under the name "slidingWindowCounts".
- *
- * Examples:
- *
- * ```
- *
- * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
- * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords
- *
- * # Runs in local mode (LocalCluster), with topology name "foobar"
- * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar
- *
- * # Runs in local mode (LocalCluster), with topology name "foobar"
- * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar local
- *
- * # Runs in remote/cluster mode, with topology name "production-topology"
- * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords production-topology remote
- * ```
- *
- * @param args First positional argument (optional) is topology name, second positional argument (optional) defines
- * whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote").
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- String topologyName = "slidingWindowCounts";
- if (args.length >= 1) {
- topologyName = args[0];
- }
- boolean runLocally = true;
- if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) {
- runLocally = false;
- }
-
- LOG.info("Topology name: " + topologyName);
- SkewedRollingTopWords rtw = new SkewedRollingTopWords(topologyName);
- if (runLocally) {
- LOG.info("Running in local mode");
- rtw.runLocally();
- }
- else {
- LOG.info("Running in remote (cluster) mode");
- rtw.runRemotely();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
deleted file mode 100644
index 598335d..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseWindowedBolt;
-import backtype.storm.utils.Utils;
-import storm.starter.bolt.PrinterBolt;
-import storm.starter.bolt.SlidingWindowSumBolt;
-import storm.starter.spout.RandomIntegerSpout;
-
-import java.util.concurrent.TimeUnit;
-
-import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
-
-/**
- * Windowing based on tuple timestamp (e.g. the time when tuple is generated
- * rather than when its processed).
- */
-public class SlidingTupleTsTopology {
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
- BaseWindowedBolt bolt = new SlidingWindowSumBolt()
- .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS))
- .withTimestampField("ts")
- .withLag(new Duration(5, TimeUnit.SECONDS));
- builder.setSpout("integer", new RandomIntegerSpout(), 1);
- builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer");
- builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum");
- Config conf = new Config();
- conf.setDebug(true);
-
- if (args != null && args.length > 0) {
- conf.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- } else {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(40000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
deleted file mode 100644
index 5031f8d..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseWindowedBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import backtype.storm.windowing.TupleWindow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.starter.bolt.PrinterBolt;
-import storm.starter.bolt.SlidingWindowSumBolt;
-import storm.starter.spout.RandomIntegerSpout;
-
-import java.util.List;
-import java.util.Map;
-
-import static backtype.storm.topology.base.BaseWindowedBolt.Count;
-
-/**
- * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt}
- * to calculate sliding window sum.
- */
-public class SlidingWindowTopology {
-
- private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTopology.class);
-
- /*
- * Computes tumbling window average
- */
- private static class TumblingWindowAvgBolt extends BaseWindowedBolt {
- private OutputCollector collector;
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(TupleWindow inputWindow) {
- int sum = 0;
- List<Tuple> tuplesInWindow = inputWindow.get();
- LOG.debug("Events in current window: " + tuplesInWindow.size());
- if (tuplesInWindow.size() > 0) {
- /*
- * Since this is a tumbling window calculation,
- * we use all the tuples in the window to compute the avg.
- */
- for (Tuple tuple : tuplesInWindow) {
- sum += (int) tuple.getValue(0);
- }
- collector.emit(new Values(sum / tuplesInWindow.size()));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("avg"));
- }
- }
-
-
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("integer", new RandomIntegerSpout(), 1);
- builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1)
- .shuffleGrouping("integer");
- builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1)
- .shuffleGrouping("slidingsum");
- builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
- Config conf = new Config();
- conf.setDebug(true);
- if (args != null && args.length > 0) {
- conf.setNumWorkers(1);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- } else {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(40000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
- }
-}