You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:43 UTC

[47/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
deleted file mode 100644
index 250c418..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.blobstore.AtomicOutputStream;
-import backtype.storm.blobstore.ClientBlobStore;
-import backtype.storm.blobstore.InputStreamWithMeta;
-import backtype.storm.blobstore.NimbusBlobStore;
-
-import backtype.storm.generated.AccessControl;
-import backtype.storm.generated.AccessControlType;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.KeyAlreadyExistsException;
-import backtype.storm.generated.KeyNotFoundException;
-import backtype.storm.generated.SettableBlobMeta;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.ShellBolt;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.blobstore.BlobStoreAclHandler;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-public class BlobStoreAPIWordCountTopology {
-    private static ClientBlobStore store; // Client API to invoke blob store API functionality
-    private static String key = "key";
-    private static String fileName = "blacklist.txt";
-    private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
-
-    public static void prepare() {
-        Config conf = new Config();
-        conf.putAll(Utils.readStormConfig());
-        store = Utils.getClientBlobStore(conf);
-    }
-
-    // Spout implementation
-    public static class RandomSentenceSpout extends BaseRichSpout {
-        SpoutOutputCollector _collector;
-
-        @Override
-        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-            _collector = collector;
-        }
-
-        @Override
-        public void nextTuple() {
-            Utils.sleep(100);
-            _collector.emit(new Values(getRandomSentence()));
-        }
-
-        @Override
-        public void ack(Object id) {
-        }
-
-        @Override
-        public void fail(Object id) {
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("sentence"));
-        }
-
-    }
-
-    // Bolt implementation
-    public static class SplitSentence extends ShellBolt implements IRichBolt {
-
-        public SplitSentence() {
-            super("python", "splitsentence.py");
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word"));
-        }
-
-        @Override
-        public Map<String, Object> getComponentConfiguration() {
-            return null;
-        }
-    }
-
-    public static class FilterWords extends BaseBasicBolt {
-        boolean poll = false;
-        long pollTime;
-        Set<String> wordSet;
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String word = tuple.getString(0);
-            // Thread Polling every 5 seconds to update the wordSet seconds which is
-            // used in FilterWords bolt to filter the words
-            try {
-                if (!poll) {
-                    wordSet = parseFile(fileName);
-                    pollTime = System.currentTimeMillis();
-                    poll = true;
-                } else {
-                    if ((System.currentTimeMillis() - pollTime) > 5000) {
-                        wordSet = parseFile(fileName);
-                        pollTime = System.currentTimeMillis();
-                    }
-                }
-            } catch (IOException exp) {
-                throw new RuntimeException(exp);
-            }
-            if (wordSet !=null && !wordSet.contains(word)) {
-                collector.emit(new Values(word));
-            }
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word"));
-        }
-    }
-
-    public void buildAndLaunchWordCountTopology(String[] args) {
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("spout", new RandomSentenceSpout(), 5);
-        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
-        builder.setBolt("filter", new FilterWords(), 6).shuffleGrouping("split");
-
-        Config conf = new Config();
-        conf.setDebug(true);
-        try {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-        } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException exp) {
-            throw new RuntimeException(exp);
-        }
-    }
-
-    // Equivalent create command on command line
-    // storm blobstore create --file blacklist.txt --acl o::rwa key
-    private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
-            throws AuthorizationException, KeyAlreadyExistsException, IOException,KeyNotFoundException {
-        String stringBlobACL = "o::rwa";
-        AccessControl blobACL = BlobStoreAclHandler.parseAccessControl(stringBlobACL);
-        List<AccessControl> acls = new LinkedList<AccessControl>();
-        acls.add(blobACL); // more ACLs can be added here
-        SettableBlobMeta settableBlobMeta = new SettableBlobMeta(acls);
-        AtomicOutputStream blobStream = clientBlobStore.createBlob(blobKey,settableBlobMeta);
-        blobStream.write(readFile(file).toString().getBytes());
-        blobStream.close();
-    }
-
-    // Equivalent update command on command line
-    // storm blobstore update --file blacklist.txt key
-    private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
-            throws KeyNotFoundException, AuthorizationException, IOException {
-        AtomicOutputStream blobOutputStream = clientBlobStore.updateBlob(blobKey);
-        blobOutputStream.write(readFile(file).toString().getBytes());
-        blobOutputStream.close();
-    }
-
-    private static String getRandomSentence() {
-        String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
-                "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
-        String sentence = sentences[new Random().nextInt(sentences.length)];
-        return sentence;
-    }
-
-    private static Set<String> getRandomWordSet() {
-        Set<String> randomWordSet = new HashSet<>();
-        Random random = new Random();
-        String[] words = new String[]{ "cow", "jumped", "over", "the", "moon", "apple", "day", "doctor", "away",
-                "four", "seven", "ago", "snow", "white", "seven", "dwarfs", "nature", "two" };
-        // Choosing atmost 5 words to update the blacklist file for filtering
-        for (int i=0; i<5; i++) {
-            randomWordSet.add(words[random.nextInt(words.length)]);
-        }
-        return randomWordSet;
-    }
-
-    private static Set<String> parseFile(String fileName) throws IOException {
-        File file = new File(fileName);
-        Set<String> wordSet = new HashSet<>();
-        if (!file.exists()) {
-            return wordSet;
-        }
-        StringTokenizer tokens = new StringTokenizer(readFile(file).toString(), "\r\n");
-        while (tokens.hasMoreElements()) {
-            wordSet.add(tokens.nextToken());
-        }
-        LOG.debug("parseFile {}", wordSet);
-        return wordSet;
-    }
-
-    private static StringBuilder readFile(File file) throws IOException {
-        String line;
-        StringBuilder fileContent = new StringBuilder();
-        // Do not use canonical file name here as we are using
-        // symbolic links to read file data and performing atomic move
-        // while updating files
-        BufferedReader br = new BufferedReader(new FileReader(file));
-        while ((line = br.readLine()) != null) {
-            fileContent.append(line);
-            fileContent.append(System.lineSeparator());
-        }
-        return fileContent;
-    }
-
-    // Creating a blacklist file to read from the disk
-    public static File createFile(String fileName) throws IOException {
-        File file = null;
-        file = new File(fileName);
-        if (!file.exists()) {
-            file.createNewFile();
-        }
-        writeToFile(file, getRandomWordSet());
-        return file;
-    }
-
-    // Updating a blacklist file periodically with random words
-    public static File updateFile(File file) throws IOException {
-        writeToFile(file, getRandomWordSet());
-        return file;
-    }
-
-    // Writing random words to be blacklisted
-    public static void writeToFile(File file, Set<String> content) throws IOException{
-        FileWriter fw = new FileWriter(file, false);
-        BufferedWriter bw = new BufferedWriter(fw);
-        Iterator<String> iter = content.iterator();
-        while(iter.hasNext()) {
-            bw.write(iter.next());
-            bw.write(System.lineSeparator());
-        }
-        bw.close();
-    }
-
-    public static void main(String[] args) {
-        prepare();
-        BlobStoreAPIWordCountTopology wc = new BlobStoreAPIWordCountTopology();
-        try {
-            File file = createFile(fileName);
-            // Creating blob again before launching topology
-            createBlobWithContent(key, store, file);
-
-            // Blostore launch command with topology blobstore map
-            // Here we are giving it a local name so that we can read from the file
-            // bin/storm jar examples/storm-starter/storm-starter-topologies-0.11.0-SNAPSHOT.jar
-            // storm.starter.BlobStoreAPIWordCountTopology bl -c
-            // topology.blobstore.map='{"key":{"localname":"blacklist.txt", "uncompress":"false"}}'
-            wc.buildAndLaunchWordCountTopology(args);
-
-            // Updating file few times every 5 seconds
-            for(int i=0; i<10; i++) {
-                updateBlobWithContent(key, store, updateFile(file));
-                Utils.sleep(5000);
-            }
-        } catch (KeyAlreadyExistsException kae) {
-            LOG.info("Key already exists {}", kae);
-        } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
-            throw new RuntimeException(exp);
-        }
-    }
-}
-
-

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
deleted file mode 100644
index d7b1b3e..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import java.util.Map;
-
-/**
- * This is a basic example of a Storm topology.
- */
-public class ExclamationTopology {
-
-  public static class ExclamationBolt extends BaseRichBolt {
-    OutputCollector _collector;
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-      _collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
-      _collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word"));
-    }
-
-
-  }
-
-  public static void main(String[] args) throws Exception {
-    TopologyBuilder builder = new TopologyBuilder();
-
-    builder.setSpout("word", new TestWordSpout(), 10);
-    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
-    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
-
-    Config conf = new Config();
-    conf.setDebug(true);
-
-    if (args != null && args.length > 0) {
-      conf.setNumWorkers(3);
-
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-    }
-    else {
-
-      LocalCluster cluster = new LocalCluster();
-      cluster.submitTopology("test", conf, builder.createTopology());
-      Utils.sleep(10000);
-      cluster.killTopology("test");
-      cluster.shutdown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java
deleted file mode 100644
index 8f78abd..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.*;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * WordCount but teh spout does not stop, and the bolts are implemented in
- * java.  This can show how fast the word count can run.
- */
-public class FastWordCountTopology {
-  public static class FastRandomSentenceSpout extends BaseRichSpout {
-    SpoutOutputCollector _collector;
-    Random _rand;
-    private static final String[] CHOICES = {
-        "marry had a little lamb whos fleese was white as snow",
-        "and every where that marry went the lamb was sure to go",
-        "one two three four five six seven eight nine ten",
-        "this is a test of the emergency broadcast system this is only a test",
-        "peter piper picked a peck of pickeled peppers"
-    };
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-      _collector = collector;
-      _rand = ThreadLocalRandom.current();
-    }
-
-    @Override
-    public void nextTuple() {
-      String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
-      _collector.emit(new Values(sentence), sentence);
-    }
-
-    @Override
-    public void ack(Object id) {
-        //Ignored
-    }
-
-    @Override
-    public void fail(Object id) {
-      _collector.emit(new Values(id), id);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("sentence"));
-    }
-  }
-
-  public static class SplitSentence extends BaseBasicBolt {
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String sentence = tuple.getString(0);
-      for (String word: sentence.split("\\s+")) {
-          collector.emit(new Values(word, 1));
-      }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word", "count"));
-    }
-  }
-
-  public static class WordCount extends BaseBasicBolt {
-    Map<String, Integer> counts = new HashMap<String, Integer>();
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String word = tuple.getString(0);
-      Integer count = counts.get(word);
-      if (count == null)
-        count = 0;
-      count++;
-      counts.put(word, count);
-      collector.emit(new Values(word, count));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word", "count"));
-    }
-  }
-
-  public static void printMetrics(Nimbus.Client client, String name) throws Exception {
-    ClusterSummary summary = client.getClusterInfo();
-    String id = null;
-    for (TopologySummary ts: summary.get_topologies()) {
-      if (name.equals(ts.get_name())) {
-        id = ts.get_id();
-      }
-    }
-    if (id == null) {
-      throw new Exception("Could not find a topology named "+name);
-    }
-    TopologyInfo info = client.getTopologyInfo(id);
-    int uptime = info.get_uptime_secs();
-    long acked = 0;
-    long failed = 0;
-    double weightedAvgTotal = 0.0;
-    for (ExecutorSummary exec: info.get_executors()) {
-      if ("spout".equals(exec.get_component_id())) {
-        SpoutStats stats = exec.get_stats().get_specific().get_spout();
-        Map<String, Long> failedMap = stats.get_failed().get(":all-time");
-        Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
-        Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
-        for (String key: ackedMap.keySet()) {
-          if (failedMap != null) {
-              Long tmp = failedMap.get(key);
-              if (tmp != null) {
-                  failed += tmp;
-              }
-          }
-          long ackVal = ackedMap.get(key);
-          double latVal = avgLatMap.get(key) * ackVal;
-          acked += ackVal;
-          weightedAvgTotal += latVal;
-        }
-      }
-    }
-    double avgLatency = weightedAvgTotal/acked;
-    System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
-  } 
-
-  public static void kill(Nimbus.Client client, String name) throws Exception {
-    KillOptions opts = new KillOptions();
-    opts.set_wait_secs(0);
-    client.killTopologyWithOpts(name, opts);
-  } 
-
-  public static void main(String[] args) throws Exception {
-
-    TopologyBuilder builder = new TopologyBuilder();
-
-    builder.setSpout("spout", new FastRandomSentenceSpout(), 4);
-
-    builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");
-    builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word"));
-
-    Config conf = new Config();
-    conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);
-
-    String name = "wc-test";
-    if (args != null && args.length > 0) {
-        name = args[0];
-    }
-
-    conf.setNumWorkers(1);
-    StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
-
-    Map clusterConf = Utils.readStormConfig();
-    clusterConf.putAll(Utils.readCommandLineOpts());
-    Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
-
-    //Sleep for 5 mins
-    for (int i = 0; i < 10; i++) {
-        Thread.sleep(30 * 1000);
-        printMetrics(client, name);
-    }
-    kill(client, name);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java b/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java
deleted file mode 100644
index 5df0688..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.*;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-public class InOrderDeliveryTest {
-  public static class InOrderSpout extends BaseRichSpout {
-    SpoutOutputCollector _collector;
-    int _base = 0;
-    int _i = 0;
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-      _collector = collector;
-      _base = context.getThisTaskIndex();
-    }
-
-    @Override
-    public void nextTuple() {
-      Values v = new Values(_base, _i);
-      _collector.emit(v, "ACK");
-      _i++;
-    }
-
-    @Override
-    public void ack(Object id) {
-      //Ignored
-    }
-
-    @Override
-    public void fail(Object id) {
-      //Ignored
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("c1", "c2"));
-    }
-  }
-
-  public static class Check extends BaseBasicBolt {
-    Map<Integer, Integer> expected = new HashMap<Integer, Integer>();
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      Integer c1 = tuple.getInteger(0);
-      Integer c2 = tuple.getInteger(1);
-      Integer exp = expected.get(c1);
-      if (exp == null) exp = 0;
-      if (c2.intValue() != exp.intValue()) {
-          System.out.println(c1+" "+c2+" != "+exp);
-          throw new FailedException(c1+" "+c2+" != "+exp);
-      }
-      exp = c2 + 1;
-      expected.put(c1, exp);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      //Empty
-    }
-  }
-
-  public static void printMetrics(Nimbus.Client client, String name) throws Exception {
-    ClusterSummary summary = client.getClusterInfo();
-    String id = null;
-    for (TopologySummary ts: summary.get_topologies()) {
-      if (name.equals(ts.get_name())) {
-        id = ts.get_id();
-      }
-    }
-    if (id == null) {
-      throw new Exception("Could not find a topology named "+name);
-    }
-    TopologyInfo info = client.getTopologyInfo(id);
-    int uptime = info.get_uptime_secs();
-    long acked = 0;
-    long failed = 0;
-    double weightedAvgTotal = 0.0;
-    for (ExecutorSummary exec: info.get_executors()) {
-      if ("spout".equals(exec.get_component_id())) {
-        SpoutStats stats = exec.get_stats().get_specific().get_spout();
-        Map<String, Long> failedMap = stats.get_failed().get(":all-time");
-        Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
-        Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
-        for (String key: ackedMap.keySet()) {
-          if (failedMap != null) {
-              Long tmp = failedMap.get(key);
-              if (tmp != null) {
-                  failed += tmp;
-              }
-          }
-          long ackVal = ackedMap.get(key);
-          double latVal = avgLatMap.get(key) * ackVal;
-          acked += ackVal;
-          weightedAvgTotal += latVal;
-        }
-      }
-    }
-    double avgLatency = weightedAvgTotal/acked;
-    System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
-  } 
-
-  public static void kill(Nimbus.Client client, String name) throws Exception {
-    KillOptions opts = new KillOptions();
-    opts.set_wait_secs(0);
-    client.killTopologyWithOpts(name, opts);
-  } 
-
-  public static void main(String[] args) throws Exception {
-
-    TopologyBuilder builder = new TopologyBuilder();
-
-    builder.setSpout("spout", new InOrderSpout(), 8);
-    builder.setBolt("count", new Check(), 8).fieldsGrouping("spout", new Fields("c1"));
-
-    Config conf = new Config();
-    conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);
-
-    String name = "in-order-test";
-    if (args != null && args.length > 0) {
-        name = args[0];
-    }
-
-    conf.setNumWorkers(1);
-    StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
-
-    Map clusterConf = Utils.readStormConfig();
-    clusterConf.putAll(Utils.readCommandLineOpts());
-    Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
-
-    //Sleep for 50 mins
-    for (int i = 0; i < 50; i++) {
-        Thread.sleep(30 * 1000);
-        printMetrics(client, name);
-    }
-    kill(client, name);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java b/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java
deleted file mode 100644
index fe0bae2..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.LocalDRPC;
-import backtype.storm.drpc.DRPCSpout;
-import backtype.storm.drpc.ReturnResults;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-
-public class ManualDRPC {
-  public static class ExclamationBolt extends BaseBasicBolt {
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("result", "return-info"));
-    }
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String arg = tuple.getString(0);
-      Object retInfo = tuple.getValue(1);
-      collector.emit(new Values(arg + "!!!", retInfo));
-    }
-
-  }
-
-  public static void main(String[] args) {
-    TopologyBuilder builder = new TopologyBuilder();
-    LocalDRPC drpc = new LocalDRPC();
-
-    DRPCSpout spout = new DRPCSpout("exclamation", drpc);
-    builder.setSpout("drpc", spout);
-    builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
-    builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
-
-    LocalCluster cluster = new LocalCluster();
-    Config conf = new Config();
-    cluster.submitTopology("exclaim", conf, builder.createTopology());
-
-    System.out.println(drpc.execute("exclamation", "aaa"));
-    System.out.println(drpc.execute("exclamation", "bbb"));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/MultipleLoggerTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/MultipleLoggerTopology.java b/examples/storm-starter/src/jvm/storm/starter/MultipleLoggerTopology.java
deleted file mode 100644
index 4285ff9..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/MultipleLoggerTopology.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * This is a basic example of a Storm topology.
- */
-public class MultipleLoggerTopology {
-  public static class ExclamationLoggingBolt extends BaseRichBolt {
-    OutputCollector _collector;
-    Logger _rootLogger = LoggerFactory.getLogger (Logger.ROOT_LOGGER_NAME);
-    // ensure the loggers are configured in the worker.xml before
-    // trying to use them here
-    Logger _logger = LoggerFactory.getLogger ("com.myapp");
-    Logger _subLogger = LoggerFactory.getLogger ("com.myapp.sub");
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-      _collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-      _rootLogger.debug ("root: This is a DEBUG message");
-      _rootLogger.info ("root: This is an INFO message");
-      _rootLogger.warn ("root: This is a WARN message");
-      _rootLogger.error ("root: This is an ERROR message");
-
-      _logger.debug ("myapp: This is a DEBUG message");
-      _logger.info ("myapp: This is an INFO message");
-      _logger.warn ("myapp: This is a WARN message");
-      _logger.error ("myapp: This is an ERROR message");
-
-      _subLogger.debug ("myapp.sub: This is a DEBUG message");
-      _subLogger.info ("myapp.sub: This is an INFO message");
-      _subLogger.warn ("myapp.sub: This is a WARN message");
-      _subLogger.error ("myapp.sub: This is an ERROR message");
-
-      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
-      _collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word"));
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    TopologyBuilder builder = new TopologyBuilder();
-
-    builder.setSpout("word", new TestWordSpout(), 10);
-    builder.setBolt("exclaim1", new ExclamationLoggingBolt(), 3).shuffleGrouping("word");
-    builder.setBolt("exclaim2", new ExclamationLoggingBolt(), 2).shuffleGrouping("exclaim1");
-
-    Config conf = new Config();
-    conf.setDebug(true);
-
-    if (args != null && args.length > 0) {
-      conf.setNumWorkers(2);
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-    } else {
-      LocalCluster cluster = new LocalCluster();
-      cluster.submitTopology("test", conf, builder.createTopology());
-      Utils.sleep(10000);
-      cluster.killTopology("test");
-      cluster.shutdown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java b/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java
deleted file mode 100644
index 021cc17..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package storm.starter;
-
-import java.util.Arrays;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.utils.Utils;
-
-import storm.starter.bolt.PrinterBolt;
-import storm.starter.spout.TwitterSampleSpout;
-
-public class PrintSampleStream {        
-    public static void main(String[] args) {
-        String consumerKey = args[0]; 
-        String consumerSecret = args[1]; 
-        String accessToken = args[2]; 
-        String accessTokenSecret = args[3];
-        String[] arguments = args.clone();
-        String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
-        
-        TopologyBuilder builder = new TopologyBuilder();
-        
-        builder.setSpout("twitter", new TwitterSampleSpout(consumerKey, consumerSecret,
-                                accessToken, accessTokenSecret, keyWords));
-        builder.setBolt("print", new PrinterBolt())
-                .shuffleGrouping("twitter");
-                
-                
-        Config conf = new Config();
-        
-        
-        LocalCluster cluster = new LocalCluster();
-        
-        cluster.submitTopology("test", conf, builder.createTopology());
-        
-        Utils.sleep(10000);
-        cluster.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java b/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
deleted file mode 100644
index 73ed45a..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.LocalDRPC;
-import backtype.storm.StormSubmitter;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.drpc.LinearDRPCTopologyBuilder;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseBatchBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.*;
-
-/**
- * This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can
- * compute the reach for any URL on Twitter in realtime by parallelizing the whole computation.
- * <p/>
- * Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people
- * who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the
- * unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower
- * records.
- * <p/>
- * This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes
- * minutes on a single machine into one that takes just a couple seconds.
- * <p/>
- * For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps.
- *
- * @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a>
- */
-public class ReachTopology {
-  public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
-    put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
-    put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
-    put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
-  }};
-
-  public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
-    put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
-    put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
-    put("tim", Arrays.asList("alex"));
-    put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
-    put("adam", Arrays.asList("david", "carissa"));
-    put("mike", Arrays.asList("john", "bob"));
-    put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
-  }};
-
-  public static class GetTweeters extends BaseBasicBolt {
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      Object id = tuple.getValue(0);
-      String url = tuple.getString(1);
-      List<String> tweeters = TWEETERS_DB.get(url);
-      if (tweeters != null) {
-        for (String tweeter : tweeters) {
-          collector.emit(new Values(id, tweeter));
-        }
-      }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "tweeter"));
-    }
-  }
-
-  public static class GetFollowers extends BaseBasicBolt {
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      Object id = tuple.getValue(0);
-      String tweeter = tuple.getString(1);
-      List<String> followers = FOLLOWERS_DB.get(tweeter);
-      if (followers != null) {
-        for (String follower : followers) {
-          collector.emit(new Values(id, follower));
-        }
-      }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "follower"));
-    }
-  }
-
-  public static class PartialUniquer extends BaseBatchBolt {
-    BatchOutputCollector _collector;
-    Object _id;
-    Set<String> _followers = new HashSet<String>();
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-      _collector = collector;
-      _id = id;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-      _followers.add(tuple.getString(1));
-    }
-
-    @Override
-    public void finishBatch() {
-      _collector.emit(new Values(_id, _followers.size()));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "partial-count"));
-    }
-  }
-
-  public static class CountAggregator extends BaseBatchBolt {
-    BatchOutputCollector _collector;
-    Object _id;
-    int _count = 0;
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-      _collector = collector;
-      _id = id;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-      _count += tuple.getInteger(1);
-    }
-
-    @Override
-    public void finishBatch() {
-      _collector.emit(new Values(_id, _count));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("id", "reach"));
-    }
-  }
-
-  public static LinearDRPCTopologyBuilder construct() {
-    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
-    builder.addBolt(new GetTweeters(), 4);
-    builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
-    builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
-    builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
-    return builder;
-  }
-
-  public static void main(String[] args) throws Exception {
-    LinearDRPCTopologyBuilder builder = construct();
-
-
-    Config conf = new Config();
-
-    if (args == null || args.length == 0) {
-      conf.setMaxTaskParallelism(3);
-      LocalDRPC drpc = new LocalDRPC();
-      LocalCluster cluster = new LocalCluster();
-      cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));
-
-      String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
-      for (String url : urlsToTry) {
-        System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
-      }
-
-      cluster.shutdown();
-      drpc.shutdown();
-    }
-    else {
-      conf.setNumWorkers(6);
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
deleted file mode 100644
index 0fb3724..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import java.util.Map;
-
-public class ResourceAwareExampleTopology {
-  public static class ExclamationBolt extends BaseRichBolt {
-    OutputCollector _collector;
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-      _collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
-      _collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word"));
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    TopologyBuilder builder = new TopologyBuilder();
-
-    SpoutDeclarer spout =  builder.setSpout("word", new TestWordSpout(), 10);
-    //set cpu requirement
-    spout.setCPULoad(20);
-    //set onheap and offheap memory requirement
-    spout.setMemoryLoad(64, 16);
-
-    BoltDeclarer bolt1 = builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
-    //sets cpu requirement.  Not neccessary to set both CPU and memory.
-    //For requirements not set, a default value will be used
-    bolt1.setCPULoad(15);
-
-    BoltDeclarer bolt2 = builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
-    bolt2.setMemoryLoad(100);
-
-    Config conf = new Config();
-    conf.setDebug(true);
-
-    /**
-     * Use to limit the maximum amount of memory (in MB) allocated to one worker process.
-     * Can be used to spread executors to to multiple workers
-     */
-    conf.setTopologyWorkerMaxHeapSize(1024.0);
-
-    //topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases).
-    //Recommended range of 0-29 but no hard limit set.
-    conf.setTopologyPriority(29);
-
-    // Set strategy to schedule topology. If not specified, default to backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
-    conf.setTopologyStrategy(backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);
-
-    if (args != null && args.length > 0) {
-      conf.setNumWorkers(3);
-
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-    }
-    else {
-
-      LocalCluster cluster = new LocalCluster();
-      cluster.submitTopology("test", conf, builder.createTopology());
-      Utils.sleep(10000);
-      cluster.killTopology("test");
-      cluster.shutdown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java b/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java
deleted file mode 100644
index 762c22a..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import org.apache.log4j.Logger;
-import storm.starter.bolt.IntermediateRankingsBolt;
-import storm.starter.bolt.RollingCountBolt;
-import storm.starter.bolt.TotalRankingsBolt;
-import storm.starter.util.StormRunner;
-
-/**
- * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.
- * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things
- * like trending topics or trending images on Twitter.
- */
-public class RollingTopWords {
-
-  private static final Logger LOG = Logger.getLogger(RollingTopWords.class);
-  private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
-  private static final int TOP_N = 5;
-
-  private final TopologyBuilder builder;
-  private final String topologyName;
-  private final Config topologyConfig;
-  private final int runtimeInSeconds;
-
-  public RollingTopWords(String topologyName) throws InterruptedException {
-    builder = new TopologyBuilder();
-    this.topologyName = topologyName;
-    topologyConfig = createTopologyConfiguration();
-    runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
-
-    wireTopology();
-  }
-
-  private static Config createTopologyConfiguration() {
-    Config conf = new Config();
-    conf.setDebug(true);
-    return conf;
-  }
-
-  private void wireTopology() throws InterruptedException {
-    String spoutId = "wordGenerator";
-    String counterId = "counter";
-    String intermediateRankerId = "intermediateRanker";
-    String totalRankerId = "finalRanker";
-    builder.setSpout(spoutId, new TestWordSpout(), 5);
-    builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
-    builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields(
-        "obj"));
-    builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
-  }
-
-  public void runLocally() throws InterruptedException {
-    StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
-  }
-
-  public void runRemotely() throws Exception {
-    StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig);
-  }
-
-  /**
-   * Submits (runs) the topology.
-   *
-   * Usage: "RollingTopWords [topology-name] [local|remote]"
-   *
-   * By default, the topology is run locally under the name "slidingWindowCounts".
-   *
-   * Examples:
-   *
-   * ```
-   *
-   * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
-   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords
-   *
-   * # Runs in local mode (LocalCluster), with topology name "foobar"
-   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar
-   *
-   * # Runs in local mode (LocalCluster), with topology name "foobar"
-   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar local
-   *
-   * # Runs in remote/cluster mode, with topology name "production-topology"
-   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords production-topology remote
-   * ```
-   *
-   * @param args First positional argument (optional) is topology name, second positional argument (optional) defines
-   *             whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote").
-   * @throws Exception
-   */
-  public static void main(String[] args) throws Exception {
-    String topologyName = "slidingWindowCounts";
-    if (args.length >= 1) {
-      topologyName = args[0];
-    }
-    boolean runLocally = true;
-    if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) {
-      runLocally = false;
-    }
-
-    LOG.info("Topology name: " + topologyName);
-    RollingTopWords rtw = new RollingTopWords(topologyName);
-    if (runLocally) {
-      LOG.info("Running in local mode");
-      rtw.runLocally();
-    }
-    else {
-      LOG.info("Running in remote (cluster) mode");
-      rtw.runRemotely();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java b/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java
deleted file mode 100644
index cb1d98c..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.testing.FeederSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import storm.starter.bolt.SingleJoinBolt;
-
-public class SingleJoinExample {
-  public static void main(String[] args) {
-    FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
-    FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
-
-    TopologyBuilder builder = new TopologyBuilder();
-    builder.setSpout("gender", genderSpout);
-    builder.setSpout("age", ageSpout);
-    builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
-        .fieldsGrouping("age", new Fields("id"));
-
-    Config conf = new Config();
-    conf.setDebug(true);
-
-    LocalCluster cluster = new LocalCluster();
-    cluster.submitTopology("join-example", conf, builder.createTopology());
-
-    for (int i = 0; i < 10; i++) {
-      String gender;
-      if (i % 2 == 0) {
-        gender = "male";
-      }
-      else {
-        gender = "female";
-      }
-      genderSpout.feed(new Values(i, gender));
-    }
-
-    for (int i = 9; i >= 0; i--) {
-      ageSpout.feed(new Values(i, i + 20));
-    }
-
-    Utils.sleep(2000);
-    cluster.shutdown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java b/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java
deleted file mode 100644
index 443c051..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import org.apache.log4j.Logger;
-import storm.starter.bolt.IntermediateRankingsBolt;
-import storm.starter.bolt.RollingCountBolt;
-import storm.starter.bolt.RollingCountAggBolt;
-import storm.starter.bolt.TotalRankingsBolt;
-import storm.starter.util.StormRunner;
-
-/**
- * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.
- * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things
- * like trending topics or trending images on Twitter. It takes an approach that assumes that some works will be much
- * more common then other words, and uses partialKeyGrouping to better balance the skewed load.
- */
-public class SkewedRollingTopWords {
-  private static final Logger LOG = Logger.getLogger(SkewedRollingTopWords.class);
-  private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
-  private static final int TOP_N = 5;
-
-  private final TopologyBuilder builder;
-  private final String topologyName;
-  private final Config topologyConfig;
-  private final int runtimeInSeconds;
-
-  public SkewedRollingTopWords(String topologyName) throws InterruptedException {
-    builder = new TopologyBuilder();
-    this.topologyName = topologyName;
-    topologyConfig = createTopologyConfiguration();
-    runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
-
-    wireTopology();
-  }
-
-  private static Config createTopologyConfiguration() {
-    Config conf = new Config();
-    conf.setDebug(true);
-    return conf;
-  }
-
-  private void wireTopology() throws InterruptedException {
-    String spoutId = "wordGenerator";
-    String counterId = "counter";
-    String aggId = "aggregator";
-    String intermediateRankerId = "intermediateRanker";
-    String totalRankerId = "finalRanker";
-    builder.setSpout(spoutId, new TestWordSpout(), 5);
-    builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word"));
-    builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj"));
-    builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj"));
-    builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
-  }
-
-  public void runLocally() throws InterruptedException {
-    StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
-  }
-
-  public void runRemotely() throws Exception {
-    StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig);
-  }
-
-  /**
-   * Submits (runs) the topology.
-   *
-   * Usage: "RollingTopWords [topology-name] [local|remote]"
-   *
-   * By default, the topology is run locally under the name "slidingWindowCounts".
-   *
-   * Examples:
-   *
-   * ```
-   *
-   * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
-   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords
-   *
-   * # Runs in local mode (LocalCluster), with topology name "foobar"
-   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar
-   *
-   * # Runs in local mode (LocalCluster), with topology name "foobar"
-   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar local
-   *
-   * # Runs in remote/cluster mode, with topology name "production-topology"
-   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords production-topology remote
-   * ```
-   *
-   * @param args First positional argument (optional) is topology name, second positional argument (optional) defines
-   *             whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote").
-   * @throws Exception
-   */
-  public static void main(String[] args) throws Exception {
-    String topologyName = "slidingWindowCounts";
-    if (args.length >= 1) {
-      topologyName = args[0];
-    }
-    boolean runLocally = true;
-    if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) {
-      runLocally = false;
-    }
-
-    LOG.info("Topology name: " + topologyName);
-    SkewedRollingTopWords rtw = new SkewedRollingTopWords(topologyName);
-    if (runLocally) {
-      LOG.info("Running in local mode");
-      rtw.runLocally();
-    }
-    else {
-      LOG.info("Running in remote (cluster) mode");
-      rtw.runRemotely();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
deleted file mode 100644
index 598335d..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/SlidingTupleTsTopology.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseWindowedBolt;
-import backtype.storm.utils.Utils;
-import storm.starter.bolt.PrinterBolt;
-import storm.starter.bolt.SlidingWindowSumBolt;
-import storm.starter.spout.RandomIntegerSpout;
-
-import java.util.concurrent.TimeUnit;
-
-import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
-
-/**
- * Windowing based on tuple timestamp (e.g. the time when tuple is generated
- * rather than when its processed).
- */
-public class SlidingTupleTsTopology {
-    public static void main(String[] args) throws Exception {
-        TopologyBuilder builder = new TopologyBuilder();
-        BaseWindowedBolt bolt = new SlidingWindowSumBolt()
-                .withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS))
-                .withTimestampField("ts")
-                .withLag(new Duration(5, TimeUnit.SECONDS));
-        builder.setSpout("integer", new RandomIntegerSpout(), 1);
-        builder.setBolt("slidingsum", bolt, 1).shuffleGrouping("integer");
-        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingsum");
-        Config conf = new Config();
-        conf.setDebug(true);
-
-        if (args != null && args.length > 0) {
-            conf.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-        } else {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", conf, builder.createTopology());
-            Utils.sleep(40000);
-            cluster.killTopology("test");
-            cluster.shutdown();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java b/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
deleted file mode 100644
index 5031f8d..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseWindowedBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import backtype.storm.windowing.TupleWindow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.starter.bolt.PrinterBolt;
-import storm.starter.bolt.SlidingWindowSumBolt;
-import storm.starter.spout.RandomIntegerSpout;
-
-import java.util.List;
-import java.util.Map;
-
-import static backtype.storm.topology.base.BaseWindowedBolt.Count;
-
-/**
- * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt}
- * to calculate sliding window sum.
- */
-public class SlidingWindowTopology {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowTopology.class);
-
-    /*
-     * Computes tumbling window average
-     */
-    private static class TumblingWindowAvgBolt extends BaseWindowedBolt {
-        private OutputCollector collector;
-
-        @Override
-        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void execute(TupleWindow inputWindow) {
-            int sum = 0;
-            List<Tuple> tuplesInWindow = inputWindow.get();
-            LOG.debug("Events in current window: " + tuplesInWindow.size());
-            if (tuplesInWindow.size() > 0) {
-                /*
-                * Since this is a tumbling window calculation,
-                * we use all the tuples in the window to compute the avg.
-                */
-                for (Tuple tuple : tuplesInWindow) {
-                    sum += (int) tuple.getValue(0);
-                }
-                collector.emit(new Values(sum / tuplesInWindow.size()));
-            }
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("avg"));
-        }
-    }
-
-
-    public static void main(String[] args) throws Exception {
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("integer", new RandomIntegerSpout(), 1);
-        builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1)
-                .shuffleGrouping("integer");
-        builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1)
-                .shuffleGrouping("slidingsum");
-        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
-        Config conf = new Config();
-        conf.setDebug(true);
-        if (args != null && args.length > 0) {
-            conf.setNumWorkers(1);
-            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-        } else {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", conf, builder.createTopology());
-            Utils.sleep(40000);
-            cluster.killTopology("test");
-            cluster.shutdown();
-        }
-    }
-}