You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 22:33:59 UTC

[03/14] storm git commit: STORM-2416 Release Packaging Improvements

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/conf/KafkaHdfsTopo.yaml
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/conf/KafkaHdfsTopo.yaml b/storm-perf/src/main/conf/KafkaHdfsTopo.yaml
deleted file mode 100755
index a8ed2f2..0000000
--- a/storm-perf/src/main/conf/KafkaHdfsTopo.yaml
+++ /dev/null
@@ -1,26 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-spout.count : 1
-bolt.count : 1
-kafka.topic : "kafka_topic"
-zk.uri : "zkhostname:2181"
-hdfs.uri : "hdfs://hdfs.namenode:8020"
-hdfs.dir : "/tmp/storm"
-hdfs.batch : 1000
-
-# storm config overrides
-topology.workers : 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml b/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml
deleted file mode 100644
index cde4c2e..0000000
--- a/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-spout.count : 1
-bolt.count : 1
-kafka.topic : "kafka_topic"
-zk.uri : "zkhostname:2181"
-
-# storm config overrides
-topology.workers : 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml b/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml
deleted file mode 100644
index d16431b..0000000
--- a/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml
+++ /dev/null
@@ -1,25 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-spout.count : 1
-bolt.count : 1
-hdfs.uri : "hdfs://hdfs.namenode:8020"
-hdfs.dir : "/tmp/storm"
-hdfs.batch : 1000
-
-
-# storm config overrides
-topology.workers : 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
deleted file mode 100644
index 11c63d3..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.perf.bolt.DevNullBolt;
-import org.apache.storm.perf.bolt.IdBolt;
-import org.apache.storm.perf.spout.ConstSpout;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-
-/**
- * ConstSpout -> IdBolt -> DevNullBolt
- * This topology measures speed of messaging between spouts->bolt  and  bolt->bolt
- *   ConstSpout : Continuously emits a constant string
- *   IdBolt : clones and emits input tuples
- *   DevNullBolt : discards incoming tuples
- */
-public class ConstSpoutIdBoltNullBoltTopo {
-
-    public static final String TOPOLOGY_NAME = "ConstSpoutIdBoltNullBoltTopo";
-    public static final String SPOUT_ID = "constSpout";
-    public static final String BOLT1_ID = "idBolt";
-    public static final String BOLT2_ID = "nullBolt";
-
-    // Configs
-    public static final String BOLT1_COUNT = "bolt1.count";
-    public static final String BOLT2_COUNT = "bolt2.count";
-    public static final String SPOUT_COUNT = "spout.count";
-
-    public static StormTopology getTopology(Map conf) {
-
-        // 1 -  Setup Spout   --------
-        ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
-
-        // 2 -  Setup IdBolt & DevNullBolt   --------
-        IdBolt bolt1 = new IdBolt();
-        DevNullBolt bolt2 = new DevNullBolt();
-
-
-        // 3 - Setup Topology  --------
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout(SPOUT_ID, spout,  Helper.getInt(conf, SPOUT_COUNT, 1) );
-
-        builder.setBolt(BOLT1_ID, bolt1, Helper.getInt(conf, BOLT1_COUNT, 1))
-                .localOrShuffleGrouping(SPOUT_ID);
-
-        builder.setBolt(BOLT2_ID, bolt2, Helper.getInt(conf, BOLT2_COUNT, 1))
-                .localOrShuffleGrouping(BOLT1_ID);
-
-        return builder.createTopology();
-    }
-
-
-    public static void main(String[] args) throws Exception {
-
-        if (args.length <= 0) {
-            // submit to local cluster
-            Config conf = new Config();
-            LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
-
-            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
-            while (true) {//  run indefinitely till Ctrl-C
-                Thread.sleep(20_000_000);
-            }
-        } else {
-            // submit to real cluster
-            if (args.length >2) {
-                System.err.println("args: runDurationSec  [optionalConfFile]");
-                return;
-            }
-            Integer durationSec = Integer.parseInt(args[0]);
-            Map topoConf =  (args.length==2) ? Utils.findAndReadConfigFile(args[1])  : new Config();
-
-            //  Submit topology to storm cluster
-            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
deleted file mode 100755
index 92c2787..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.perf.bolt.DevNullBolt;
-import org.apache.storm.perf.spout.ConstSpout;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.BoltDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-
-/***
- * This topo helps measure the messaging speed between a spout and a bolt.
- *  Spout generates a stream of a fixed string.
- *  Bolt will simply ack and discard the tuple received
- */
-
-public class ConstSpoutNullBoltTopo {
-
-    public static final String TOPOLOGY_NAME = "ConstSpoutNullBoltTopo";
-    public static final String SPOUT_ID = "constSpout";
-    public static final String BOLT_ID = "nullBolt";
-
-    // Configs
-    public static final String BOLT_COUNT = "bolt.count";
-    public static final String SPOUT_COUNT = "spout.count";
-    public static final String GROUPING = "grouping"; // can be 'local' or 'shuffle'
-
-    public static final String LOCAL_GROPING = "local";
-    public static final String SHUFFLE_GROUPING = "shuffle";
-    public static final String DEFAULT_GROUPING = LOCAL_GROPING;
-
-    public static StormTopology getTopology(Map conf) {
-
-        // 1 -  Setup Spout   --------
-        ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
-
-        // 2 -  Setup DevNull Bolt   --------
-        DevNullBolt bolt = new DevNullBolt();
-
-
-        // 3 - Setup Topology  --------
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout(SPOUT_ID, spout,  Helper.getInt(conf, SPOUT_COUNT, 1) );
-        BoltDeclarer bd = builder.setBolt(BOLT_ID, bolt, Helper.getInt(conf, BOLT_COUNT, 1));
-
-        String groupingType = Helper.getStr(conf, GROUPING);
-        if(groupingType==null || groupingType.equalsIgnoreCase(DEFAULT_GROUPING) )
-            bd.localOrShuffleGrouping(SPOUT_ID);
-        else if(groupingType.equalsIgnoreCase(SHUFFLE_GROUPING) )
-            bd.shuffleGrouping(SPOUT_ID);
-        return builder.createTopology();
-    }
-
-    /**
-     * ConstSpout -> DevNullBolt with configurable grouping (default localOrShuffle)
-     */
-    public static void main(String[] args) throws Exception {
-
-        if(args.length <= 0) {
-            // For IDE based profiling ... submit topology to local cluster
-            Config conf = new Config();
-            final LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
-
-            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
-            while (true) {//  run indefinitely till Ctrl-C
-                Thread.sleep(20_000_000);
-            }
-
-        } else {
-            // For measuring perf against a Storm cluster
-            if (args.length > 2) {
-                System.err.println("args: runDurationSec  [optionalConfFile]");
-                return;
-            }
-            Integer durationSec = Integer.parseInt(args[0]);
-            Map topoConf =  (args.length==2) ? Utils.findAndReadConfigFile(args[1])  : new Config();
-
-            //  Submit topology to storm cluster
-            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
-        }
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
deleted file mode 100755
index 721ae3d..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.perf.spout.ConstSpout;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-
-
-/***
- * This topo helps measure how fast a spout can produce data (so no bolts are attached)
- *  Spout generates a stream of a fixed string.
- */
-
-public class ConstSpoutOnlyTopo {
-
-    public static final String TOPOLOGY_NAME = "ConstSpoutOnlyTopo";
-    public static final String SPOUT_ID = "constSpout";
-
-
-    public static StormTopology getTopology() {
-
-        // 1 -  Setup Const Spout   --------
-        ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
-
-        // 2 - Setup Topology  --------
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout(SPOUT_ID, spout, 1);
-        return builder.createTopology();
-    }
-
-    /**
-     * ConstSpout only topology  (No bolts)
-     */
-    public static void main(String[] args) throws Exception {
-        if(args.length <= 0) {
-            // For IDE based profiling ... submit topology to local cluster
-            LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology());
-
-            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
-            while (true) {//  run indefinitely till Ctrl-C
-                Thread.sleep(20_000_000);
-            }
-        } else {
-            //  Submit topology to storm cluster
-            if (args.length != 1) {
-                System.err.println("args: runDurationSec");
-                return;
-            }
-            Integer durationSec = Integer.parseInt(args[0]);
-
-            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, new Config(), getTopology());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
deleted file mode 100644
index d518c86..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License
-*/
-package org.apache.storm.perf;
-
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.perf.bolt.CountBolt;
-import org.apache.storm.perf.bolt.SplitSentenceBolt;
-import org.apache.storm.perf.spout.FileReadSpout;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-
-
-import java.util.Map;
-
-/***
- * This topo helps measure speed of word count.
- *  Spout loads a file into memory on initialization, then emits the lines in an endless loop.
- */
-
-public class FileReadWordCountTopo {
-    public static final String SPOUT_ID =   "spout";
-    public static final String COUNT_ID =   "counter";
-    public static final String SPLIT_ID =   "splitter";
-    public static final String TOPOLOGY_NAME = "FileReadWordCountTopo";
-
-    // Config settings
-    public static final String SPOUT_NUM =  "spout.count";
-    public static final String SPLIT_NUM =  "splitter.count";
-    public static final String COUNT_NUM =  "counter.count";
-    public static final String INPUT_FILE = "input.file";
-
-    public static final int DEFAULT_SPOUT_NUM = 1;
-    public static final int DEFAULT_SPLIT_BOLT_NUM = 2;
-    public static final int DEFAULT_COUNT_BOLT_NUM = 2;
-
-
-    public static StormTopology getTopology(Map config) {
-
-        final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
-        final int spBoltNum = Helper.getInt(config, SPLIT_NUM, DEFAULT_SPLIT_BOLT_NUM);
-        final int cntBoltNum = Helper.getInt(config, COUNT_NUM, DEFAULT_COUNT_BOLT_NUM);
-        final String inputFile = Helper.getStr(config, INPUT_FILE);
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout(SPOUT_ID, new FileReadSpout(inputFile), spoutNum);
-        builder.setBolt(SPLIT_ID, new SplitSentenceBolt(), spBoltNum).localOrShuffleGrouping(SPOUT_ID);
-        builder.setBolt(COUNT_ID, new CountBolt(), cntBoltNum).fieldsGrouping(SPLIT_ID, new Fields(SplitSentenceBolt.FIELDS));
-
-        return builder.createTopology();
-    }
-
-    public static void main(String[] args) throws Exception {
-        if(args.length <= 0) {
-            // For IDE based profiling ... submit topology to local cluster
-            Config conf = new Config();
-            conf.put(INPUT_FILE, "resources/randomwords.txt");
-            LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
-
-            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
-            while (true) {//  run indefinitely till Ctrl-C
-                Thread.sleep(20_000_000);
-            }
-        } else {
-            //  Submit to Storm cluster
-            if (args.length !=2) {
-                System.err.println("args: runDurationSec  confFile");
-                return;
-            }
-            Integer durationSec = Integer.parseInt(args[0]);
-            Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
-            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
-
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
deleted file mode 100644
index 248b523..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.perf;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.hdfs.spout.HdfsSpout;
-import org.apache.storm.hdfs.spout.TextFileReader;
-import org.apache.storm.perf.bolt.DevNullBolt;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-
-/***
- * This topo helps measure speed of reading from Hdfs.
- *  Spout Reads from Hdfs.
- *  Bolt acks and discards tuples
- */
-
-
-public class HdfsSpoutNullBoltTopo {
-    // names
-    static final String TOPOLOGY_NAME = "HdfsSpoutNullBoltTopo";
-    static final String SPOUT_ID = "hdfsSpout";
-    static final String BOLT_ID = "devNullBolt";
-
-    // configs
-    static final String SPOUT_NUM = "spout.count";
-    static final String BOLT_NUM = "bolt.count";
-
-    static final String HDFS_URI    = "hdfs.uri";
-    static final String SOURCE_DIR  = "hdfs.source.dir";
-    static final String ARCHIVE_DIR = "hdfs.archive.dir";
-    static final String BAD_DIR     = "hdfs.bad.dir";
-
-    public static final int DEFAULT_SPOUT_NUM = 1;
-    public static final int DEFAULT_BOLT_NUM = 1;
-
-
-    public static StormTopology getTopology(Map config) {
-
-        final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
-        final int boltNum = Helper.getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
-        final String fileFormat = Helper.getStr(config, "text");
-        final String hdfsUri = Helper.getStr(config, HDFS_URI);
-        final String sourceDir = Helper.getStr(config, SOURCE_DIR);
-        final String archiveDir = Helper.getStr(config, ARCHIVE_DIR);
-        final String badDir = Helper.getStr(config, BAD_DIR);
-
-
-        // 1 -  Setup Hdfs Spout   --------
-        HdfsSpout spout = new HdfsSpout()
-                .setReaderType(fileFormat)
-                .setHdfsUri(hdfsUri)
-                .setSourceDir(sourceDir)
-                .setArchiveDir(archiveDir)
-                .setBadFilesDir(badDir)
-                .withOutputFields(TextFileReader.defaultFields);
-
-        // 2 -   DevNull Bolt   --------
-        DevNullBolt bolt = new DevNullBolt();
-
-        // 3 - Setup Topology  --------
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout(SPOUT_ID, spout, spoutNum);
-        builder.setBolt(BOLT_ID, bolt, boltNum)
-                .localOrShuffleGrouping(SPOUT_ID);
-
-        return builder.createTopology();
-    }
-
-    public static void main(String[] args) throws Exception {
-        if (args.length != 2) {
-            System.err.println("args: runDurationSec topConfFile");
-            return;
-        }
-
-        Integer durationSec = Integer.parseInt(args[0]);
-        Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
-        // Submit to Storm cluster
-        Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
deleted file mode 100755
index 4293aac..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.hdfs.bolt.HdfsBolt;
-import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
-import org.apache.storm.hdfs.bolt.format.FileNameFormat;
-import org.apache.storm.hdfs.bolt.format.RecordFormat;
-import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
-import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
-import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
-import org.apache.storm.kafka.BrokerHosts;
-import org.apache.storm.kafka.KafkaSpout;
-import org.apache.storm.kafka.SpoutConfig;
-import org.apache.storm.kafka.StringMultiSchemeWithTopic;
-import org.apache.storm.kafka.ZkHosts;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-import java.util.UUID;
-
-/***
- * This topo helps measure speed of reading from Kafka and writing to Hdfs.
- *  Spout Reads from Kafka.
- *  Bolt writes to Hdfs
- */
-
-public class KafkaHdfsTopo {
-
-  // configs - topo parallelism
-  public static final String SPOUT_NUM = "spout.count";
-  public static final String BOLT_NUM = "bolt.count";
-  // configs - kafka spout
-  public static final String KAFKA_TOPIC = "kafka.topic";
-  public static final String ZOOKEEPER_URI = "zk.uri";
-  // configs - hdfs bolt
-  public static final String HDFS_URI = "hdfs.uri";
-  public static final String HDFS_PATH = "hdfs.dir";
-  public static final String HDFS_BATCH = "hdfs.batch";
-
-
-  public static final int DEFAULT_SPOUT_NUM = 1;
-  public static final int DEFAULT_BOLT_NUM = 1;
-  public static final int DEFAULT_HDFS_BATCH = 1000;
-
-  // names
-  public static final String TOPOLOGY_NAME = "KafkaHdfsTopo";
-  public static final String SPOUT_ID = "kafkaSpout";
-  public static final String BOLT_ID = "hdfsBolt";
-
-
-
-  public static StormTopology getTopology(Map config) {
-
-    final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
-    final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
-
-    final int hdfsBatch = getInt(config, HDFS_BATCH, DEFAULT_HDFS_BATCH);
-
-    // 1 -  Setup Kafka Spout   --------
-    String zkConnString = getStr(config, ZOOKEEPER_URI);
-    String topicName = getStr(config, KAFKA_TOPIC);
-
-    BrokerHosts brokerHosts = new ZkHosts(zkConnString);
-    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" + topicName, UUID.randomUUID().toString());
-    spoutConfig.scheme = new StringMultiSchemeWithTopic();
-    spoutConfig.ignoreZkOffsets = true;
-
-    KafkaSpout spout = new KafkaSpout(spoutConfig);
-
-    // 2 -  Setup HFS Bolt   --------
-    String Hdfs_url = getStr(config, HDFS_URI);
-    RecordFormat format = new LineWriter("str");
-    SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch);
-    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB);
-
-    FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(getStr(config,HDFS_PATH) );
-
-    // Instantiate the HdfsBolt
-    HdfsBolt bolt = new HdfsBolt()
-            .withFsUrl(Hdfs_url)
-            .withFileNameFormat(fileNameFormat)
-            .withRecordFormat(format)
-            .withRotationPolicy(rotationPolicy)
-            .withSyncPolicy(syncPolicy);
-
-
-    // 3 - Setup Topology  --------
-    TopologyBuilder builder = new TopologyBuilder();
-    builder.setSpout(SPOUT_ID, spout, spoutNum);
-    builder.setBolt(BOLT_ID, bolt, boltNum)
-            .localOrShuffleGrouping(SPOUT_ID);
-
-    return builder.createTopology();
-  }
-
-
-  public static int getInt(Map map, Object key, int def) {
-    return Utils.getInt(Utils.get(map, key, def));
-  }
-
-  public static String getStr(Map map, Object key) {
-    return (String) map.get(key);
-  }
-
-
-    /** Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming */
-    public static void main(String[] args) throws Exception {
-
-        if (args.length != 2) {
-            System.err.println("args: runDurationSec topConfFile");
-            return;
-        }
-
-        Integer durationSec = Integer.parseInt(args[0]);
-        String confFile = args[1];
-        Map topoConf = Utils.findAndReadConfigFile(confFile);
-
-        //  Submit topology to Storm cluster
-        Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
-    }
-
-    public static class LineWriter implements RecordFormat {
-        private String lineDelimiter = System.lineSeparator();
-        private String fieldName;
-
-        public LineWriter(String fieldName) {
-            this.fieldName = fieldName;
-        }
-
-        /**
-         * Overrides the default record delimiter.
-         *
-         * @param delimiter
-         * @return
-         */
-        public LineWriter withLineDelimiter(String delimiter){
-            this.lineDelimiter = delimiter;
-            return this;
-        }
-
-        @Override
-        public byte[] format(Tuple tuple) {
-            return (tuple.getValueByField(fieldName).toString() +  this.lineDelimiter).getBytes();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
deleted file mode 100755
index 3512c65..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License
-*/
-
-package org.apache.storm.perf;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.BrokerHosts;
-import org.apache.storm.kafka.KafkaSpout;
-import org.apache.storm.kafka.SpoutConfig;
-import org.apache.storm.kafka.StringMultiSchemeWithTopic;
-import org.apache.storm.kafka.ZkHosts;
-import org.apache.storm.perf.bolt.DevNullBolt;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-import java.util.UUID;
-
-
-/***
- * This topo helps measure speed of reading from Kafka
- *   Spout Reads from Kafka.
- *   Bolt acks and discards tuples
- */
-
-public class KafkaSpoutNullBoltTopo {
-
-    // configs - topo parallelism
-    public static final String SPOUT_NUM = "spout.count";
-    public static final String BOLT_NUM = "bolt.count";
-
-    // configs - kafka spout
-    public static final String KAFKA_TOPIC = "kafka.topic";
-    public static final String ZOOKEEPER_URI = "zk.uri";
-
-
-    public static final int DEFAULT_SPOUT_NUM = 1;
-    public static final int DEFAULT_BOLT_NUM = 1;
-
-    // names
-    public static final String TOPOLOGY_NAME = "KafkaSpoutNullBoltTopo";
-    public static final String SPOUT_ID = "kafkaSpout";
-    public static final String BOLT_ID = "devNullBolt";
-
-
-    public static StormTopology getTopology(Map config) {
-
-        final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
-        final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
-        // 1 -  Setup Kafka Spout   --------
-
-        String zkConnString = getStr(config, ZOOKEEPER_URI);
-        String topicName = getStr(config, KAFKA_TOPIC);
-
-        BrokerHosts brokerHosts = new ZkHosts(zkConnString);
-        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" + topicName, UUID.randomUUID().toString());
-        spoutConfig.scheme = new StringMultiSchemeWithTopic();
-        spoutConfig.ignoreZkOffsets = true;
-
-        KafkaSpout spout = new KafkaSpout(spoutConfig);
-
-        // 2 -   DevNull Bolt   --------
-        DevNullBolt bolt = new DevNullBolt();
-
-        // 3 - Setup Topology  --------
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout(SPOUT_ID, spout, spoutNum);
-        builder.setBolt(BOLT_ID, bolt, boltNum)
-                .localOrShuffleGrouping(SPOUT_ID);
-
-        return builder.createTopology();
-    }
-
-
-    public static int getInt(Map map, Object key, int def) {
-        return Utils.getInt(Utils.get(map, key, def));
-    }
-
-    public static String getStr(Map map, Object key) {
-        return (String) map.get(key);
-    }
-
-
-    /**
-     * Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming
-     */
-    public static void main(String[] args) throws Exception {
-        if (args.length !=2) {
-            System.err.println("args: runDurationSec confFile");
-            return;
-        }
-        Integer durationSec = Integer.parseInt(args[0]);
-        Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
-        //  Submit to Storm cluster
-        Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
deleted file mode 100755
index 5b97540..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-
-package org.apache.storm.perf;
-
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.hdfs.bolt.HdfsBolt;
-import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
-import org.apache.storm.hdfs.bolt.format.FileNameFormat;
-import org.apache.storm.hdfs.bolt.format.RecordFormat;
-import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
-import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
-import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
-import org.apache.storm.perf.spout.StringGenSpout;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-
-/***
- * This topo helps measure speed of writing to Hdfs
- *  Spout generates fixed length random strings.
- *  Bolt writes to Hdfs
- */
-
-public class StrGenSpoutHdfsBoltTopo {
-
-    // configs - topo parallelism
-    public static final String SPOUT_NUM = "spout.count";
-    public static final String BOLT_NUM =  "bolt.count";
-
-    // configs - hdfs bolt
-    public static final String HDFS_URI   = "hdfs.uri";
-    public static final String HDFS_PATH  = "hdfs.dir";
-    public static final String HDFS_BATCH = "hdfs.batch";
-
-    public static final int DEFAULT_SPOUT_NUM = 1;
-    public static final int DEFAULT_BOLT_NUM = 1;
-    public static final int DEFAULT_HDFS_BATCH = 1000;
-
-    // names
-    public static final String TOPOLOGY_NAME = "StrGenSpoutHdfsBoltTopo";
-    public static final String SPOUT_ID = "GenSpout";
-    public static final String BOLT_ID = "hdfsBolt";
-
-
-    public static StormTopology getTopology(Map topoConf) {
-        final int hdfsBatch = Helper.getInt(topoConf, HDFS_BATCH, DEFAULT_HDFS_BATCH);
-
-        // 1 -  Setup StringGen Spout   --------
-        StringGenSpout spout = new StringGenSpout(100).withFieldName("str");
-
-
-        // 2 -  Setup HFS Bolt   --------
-        String Hdfs_url = Helper.getStr(topoConf, HDFS_URI);
-        RecordFormat format = new LineWriter("str");
-        SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch);
-        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB);
-        final int spoutNum = Helper.getInt(topoConf, SPOUT_NUM, DEFAULT_SPOUT_NUM);
-        final int boltNum = Helper.getInt(topoConf, BOLT_NUM, DEFAULT_BOLT_NUM);
-
-        // Use default, Storm-generated file names
-        FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(Helper.getStr(topoConf, HDFS_PATH) );
-
-        // Instantiate the HdfsBolt
-        HdfsBolt bolt = new HdfsBolt()
-                .withFsUrl(Hdfs_url)
-                .withFileNameFormat(fileNameFormat)
-                .withRecordFormat(format)
-                .withRotationPolicy(rotationPolicy)
-                .withSyncPolicy(syncPolicy);
-
-
-        // 3 - Setup Topology  --------
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout(SPOUT_ID, spout, spoutNum);
-        builder.setBolt(BOLT_ID, bolt, boltNum)
-                .localOrShuffleGrouping(SPOUT_ID);
-
-        return builder.createTopology();
-    }
-
-
-    /** Spout generates random strings and HDFS bolt writes them to a text file */
-    public static void main(String[] args) throws Exception {
-        if(args.length <= 0) {
-            // submit to local cluster
-            Map topoConf = Utils.findAndReadConfigFile("conf/HdfsSpoutTopo.yaml");
-            LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(topoConf));
-
-            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
-            while (true) {//  run indefinitely till Ctrl-C
-                Thread.sleep(20_000_000);
-            }
-        } else {
-            //  Submit to Storm cluster
-            if (args.length !=2) {
-                System.err.println("args: runDurationSec confFile");
-                return;
-            }
-            Integer durationSec = Integer.parseInt(args[0]);
-            Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
-            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
-        }
-    }
-
-
-    public static class LineWriter implements RecordFormat {
-        private String lineDelimiter = System.lineSeparator();
-        private String fieldName;
-
-        public LineWriter(String fieldName) {
-            this.fieldName = fieldName;
-        }
-
-        /**
-         * Overrides the default record delimiter.
-         *
-         * @param delimiter
-         * @return
-         */
-        public LineWriter withLineDelimiter(String delimiter){
-            this.lineDelimiter = delimiter;
-            return this;
-        }
-
-        public byte[] format(Tuple tuple) {
-            return (tuple.getValueByField(fieldName).toString() +  this.lineDelimiter).getBytes();
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
deleted file mode 100644
index b79a0ee..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.bolt;
-
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class CountBolt extends BaseBasicBolt {
-    public static final String FIELDS_WORD = "word";
-    public static final String FIELDS_COUNT = "count";
-
-    Map<String, Integer> counts = new HashMap<>();
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context) {
-    }
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-        String word = tuple.getString(0);
-        Integer count = counts.get(word);
-        if (count == null)
-            count = 0;
-        count++;
-        counts.put(word, count);
-        collector.emit(new Values(word, count));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(FIELDS_WORD, FIELDS_COUNT));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
deleted file mode 100755
index b85ce15..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.bolt;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-
-import java.util.Map;
-
-
-public class DevNullBolt extends BaseRichBolt {
-    private OutputCollector collector;
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
deleted file mode 100644
index 116265e..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.bolt;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-public class IdBolt extends BaseRichBolt {
-    private OutputCollector collector;
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        collector.emit(tuple, new Values( tuple.getValues() ) );
-        collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("field1"));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
deleted file mode 100644
index 96f9f73..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.bolt;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-
-public class SplitSentenceBolt extends BaseBasicBolt {
-    public static final String FIELDS = "word";
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context) {
-    }
-
-    @Override
-    public void execute(Tuple input, BasicOutputCollector collector) {
-        for (String word : splitSentence(input.getString(0))) {
-            collector.emit(new Values(word));
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(FIELDS));
-    }
-
-
-    public static String[] splitSentence(String sentence) {
-        if (sentence != null) {
-            return sentence.split("\\s+");
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java b/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
deleted file mode 100755
index b66e4f3..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.spout;
-
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class ConstSpout extends BaseRichSpout {
-
-    private static final String DEFAUT_FIELD_NAME = "str";
-    private String value;
-    private String fieldName = DEFAUT_FIELD_NAME;
-    private SpoutOutputCollector collector = null;
-    private int count=0;
-
-    public ConstSpout(String value) {
-        this.value = value;
-    }
-
-    public ConstSpout withOutputFields(String fieldName) {
-        this.fieldName = fieldName;
-        return this;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(fieldName));
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void nextTuple() {
-        List<Object> tuple = Collections.singletonList((Object) value);
-        collector.emit(tuple, count++);
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        super.ack(msgId);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java b/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
deleted file mode 100644
index 959e7c6..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.spout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class FileReadSpout extends BaseRichSpout {
-    public static final String FIELDS = "sentence";
-    private static final long serialVersionUID = -2582705611472467172L;
-    private transient FileReader reader;
-    private String file;
-    private boolean ackEnabled = true;
-    private SpoutOutputCollector collector;
-
-    private long count = 0;
-
-
-    public FileReadSpout(String file) {
-        this.file = file;
-    }
-
-    // For testing
-    FileReadSpout(FileReader reader) {
-        this.reader = reader;
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context,
-                     SpoutOutputCollector collector) {
-        this.collector = collector;
-        Object ackObj = conf.get("topology.acker.executors");
-        if (ackObj != null && ackObj.equals(0)) {
-            this.ackEnabled = false;
-        }
-        // for tests, reader will not be null
-        if (this.reader == null) {
-            this.reader = new FileReader(this.file);
-        }
-    }
-
-    @Override
-    public void nextTuple() {
-        if (ackEnabled) {
-            collector.emit(new Values(reader.nextLine()), count);
-            count++;
-        } else {
-            collector.emit(new Values(reader.nextLine()));
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(FIELDS));
-    }
-
-    public static List<String> readLines(InputStream input) {
-        List<String> lines = new ArrayList<>();
-        try {
-            BufferedReader reader = new BufferedReader(new InputStreamReader(input));
-            try {
-                String line;
-                while ((line = reader.readLine()) != null) {
-                    lines.add(line);
-                }
-            } catch (IOException e) {
-                throw new RuntimeException("Reading file failed", e);
-            } finally {
-                reader.close();
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("Error closing reader", e);
-        }
-        return lines;
-    }
-
-    public static class FileReader implements Serializable {
-
-        private static final long serialVersionUID = -7012334600647556267L;
-
-        public final String file;
-        private List<String> contents = null;
-        private int index = 0;
-        private int limit = 0;
-
-        public FileReader(String file) {
-            this.file = file;
-            if (this.file != null) {
-                try {
-                    this.contents = readLines(new FileInputStream(this.file));
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    throw new IllegalArgumentException("Cannot open file " + file, e);
-                }
-                this.limit = contents.size();
-            } else {
-                throw new IllegalArgumentException("file name cannot be null");
-            }
-        }
-
-        public String nextLine() {
-            if (index >= limit) {
-                index = 0;
-            }
-            String line = contents.get(index);
-            index++;
-            return line;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java b/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
deleted file mode 100755
index f9c665b..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.spout;
-
-
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-/** Spout pre-computes a list with 30k fixed length random strings.
- *  Emits sequentially from this list, over and over again.
- */
-
-public class StringGenSpout extends BaseRichSpout {
-
-    private static final String DEFAULT_FIELD_NAME = "str";
-    private int strLen;
-    private final int strCount = 30_000;
-    private String fieldName = DEFAULT_FIELD_NAME;
-    private SpoutOutputCollector collector = null;
-    ArrayList<String> records;
-    private int curr=0;
-    private int count=0;
-
-    public StringGenSpout(int strLen) {
-        this.strLen = strLen;
-    }
-
-    public StringGenSpout withFieldName(String fieldName) {
-        this.fieldName = fieldName;
-        return this;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare( new Fields(fieldName) );
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        this.records = genStringList(strLen, strCount);
-
-        this.collector = collector;
-    }
-
-    private static ArrayList<String> genStringList(int strLen, int count) {
-        ArrayList<String> result = new ArrayList<String>(count);
-        for (int i = 0; i < count; i++) {
-            result.add( RandomStringUtils.random(strLen) );
-        }
-        return result;
-    }
-
-    @Override
-    public void nextTuple() {
-        List<Object> tuple;
-        if( curr < strCount ) {
-            tuple = Collections.singletonList((Object) records.get(curr));
-            ++curr;
-            collector.emit(tuple, ++count);
-        }
-    }
-
-
-    @Override
-    public void ack(Object msgId) {
-        super.ack(msgId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java b/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
deleted file mode 100755
index 686f9da..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.utils;
-
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.utils.NimbusClient;
-import org.apache.storm.utils.Utils;
-import org.apache.log4j.Logger;
-
-import java.io.PrintWriter;
-import java.util.*;
-
-
-public class BasicMetricsCollector  {
-
-    private LocalCluster localCluster = null;
-    private Nimbus.Client client = null;
-    private PrintWriter dataWriter;
-    private long startTime=0;
-
-    public enum MetricsItem {
-        TOPOLOGY_STATS,
-        XSFER_RATE,
-        SPOUT_THROUGHPUT,
-        SPOUT_LATENCY,
-        ALL
-    }
-
-
-    /* headers */
-    public static final String TIME = "elapsed (sec)";
-    public static final String TIME_FORMAT = "%d";
-    public static final String TOTAL_SLOTS = "total_slots";
-    public static final String USED_SLOTS = "used_slots";
-    public static final String WORKERS = "workers";
-    public static final String TASKS = "tasks";
-    public static final String EXECUTORS = "executors";
-    public static final String TRANSFERRED = "transferred (messages)";
-    public static final String XSFER_RATE = "transfer rate (messages/s)";
-    public static final String SPOUT_EXECUTORS = "spout_executors";
-    public static final String SPOUT_TRANSFERRED = "spout_transferred (messages)";
-    public static final String SPOUT_ACKED = "spout_acks";
-    public static final String SPOUT_THROUGHPUT = "spout_throughput (acks/s)";
-    public static final String SPOUT_AVG_COMPLETE_LATENCY = "spout_avg_complete_latency(ms)";
-    public static final String SPOUT_AVG_LATENCY_FORMAT = "%.1f";
-    public static final String SPOUT_MAX_COMPLETE_LATENCY = "spout_max_complete_latency(ms)";
-    public static final String SPOUT_MAX_LATENCY_FORMAT = "%.1f";
-    private static final Logger LOG = Logger.getLogger(BasicMetricsCollector.class);
-    final MetricsCollectorConfig config;
-    //    final StormTopology topology;
-    final Set<String> header = new LinkedHashSet<String>();
-    final Map<String, String> metrics = new HashMap<String, String>();
-    int lineNumber = 0;
-
-    final boolean collectTopologyStats;
-    final boolean collectExecutorStats;
-    final boolean collectThroughput;
-
-    final boolean collectSpoutThroughput;
-    final boolean collectSpoutLatency;
-
-    private MetricsSample lastSample;
-    private MetricsSample curSample;
-    private double maxLatency = 0;
-
-    boolean first = true;
-
-    public BasicMetricsCollector(Nimbus.Client client, String topoName, Map stormConfig) {
-        this(topoName, stormConfig);
-        this.client = client;
-        this.localCluster = null;
-    }
-
-    public BasicMetricsCollector(LocalCluster localCluster, String topoName, Map stormConfig) {
-        this(topoName, stormConfig);
-        this.client = null;
-        this.localCluster = localCluster;
-    }
-
-    private BasicMetricsCollector(String topoName, Map stormConfig) {
-        Set<MetricsItem> items = getMetricsToCollect();
-        this.config = new MetricsCollectorConfig(topoName, stormConfig);
-        collectTopologyStats = collectTopologyStats(items);
-        collectExecutorStats = collectExecutorStats(items);
-        collectThroughput = collectThroughput(items);
-        collectSpoutThroughput = collectSpoutThroughput(items);
-        collectSpoutLatency = collectSpoutLatency(items);
-        dataWriter = new PrintWriter(System.err);
-    }
-
-
-    private Set<MetricsItem>  getMetricsToCollect() {
-        Set<MetricsItem> result = new HashSet<>();
-        result.add(MetricsItem.ALL);
-        return result;
-    }
-
-    public void collect(Nimbus.Client client) {
-        try {
-            if (!first) {
-                this.lastSample = this.curSample;
-                this.curSample = MetricsSample.factory(client, config.name);
-                updateStats(dataWriter);
-                writeLine(dataWriter);
-            } else {
-                LOG.info("Getting baseline metrics sample.");
-                writeHeader(dataWriter);
-                this.curSample = MetricsSample.factory(client, config.name);
-                first = false;
-                startTime = System.currentTimeMillis();
-            }
-        } catch (Exception e) {
-            LOG.error("storm metrics failed! ", e);
-        }
-    }
-
-    public void collect(LocalCluster localCluster) {
-        try {
-            if (!first) {
-                this.lastSample = this.curSample;
-                this.curSample = MetricsSample.factory(localCluster, config.name);
-                updateStats(dataWriter);
-                writeLine(dataWriter);
-            } else {
-                LOG.info("Getting baseline metrics sample.");
-                writeHeader(dataWriter);
-                this.curSample = MetricsSample.factory(localCluster, config.name);
-                first = false;
-                startTime = System.currentTimeMillis();
-            }
-        } catch (Exception e) {
-            LOG.error("storm metrics failed! ", e);
-        }
-    }
-
-    public void close() {
-        dataWriter.close();
-    }
-
-    boolean updateStats(PrintWriter writer)
-            throws Exception {
-        if (collectTopologyStats) {
-            updateTopologyStats();
-        }
-        if (collectExecutorStats) {
-            updateExecutorStats();
-        }
-        return true;
-    }
-
-    void updateTopologyStats() {
-        long timeTotal = System.currentTimeMillis() - startTime;
-        int numWorkers = this.curSample.getNumWorkers();
-        int numExecutors = this.curSample.getNumExecutors();
-        int numTasks = this.curSample.getNumTasks();
-        metrics.put(TIME, String.format(TIME_FORMAT, timeTotal / 1000));
-        metrics.put(WORKERS, Integer.toString(numWorkers));
-        metrics.put(EXECUTORS, Integer.toString(numExecutors));
-        metrics.put(TASKS, Integer.toString(numTasks));
-    }
-
-    void updateExecutorStats() {
-        long timeDiff = this.curSample.getSampleTime() - this.lastSample.getSampleTime();
-        long transferredDiff = this.curSample.getTotalTransferred() - this.lastSample.getTotalTransferred();
-        long throughput = transferredDiff / (timeDiff / 1000);
-
-        long spoutDiff = this.curSample.getSpoutTransferred() - this.lastSample.getSpoutTransferred();
-        long spoutAckedDiff = this.curSample.getTotalAcked() - this.lastSample.getTotalAcked();
-        long spoutThroughput = spoutDiff / (timeDiff / 1000);
-
-        if (collectThroughput) {
-            metrics.put(TRANSFERRED, Long.toString(transferredDiff));
-            metrics.put(XSFER_RATE, Long.toString(throughput));
-        }
-
-        if (collectSpoutThroughput) {
-
-            metrics.put(SPOUT_EXECUTORS, Integer.toString(this.curSample.getSpoutExecutors()));
-            metrics.put(SPOUT_TRANSFERRED, Long.toString(spoutDiff));
-            metrics.put(SPOUT_ACKED, Long.toString(spoutAckedDiff));
-            metrics.put(SPOUT_THROUGHPUT, Long.toString(spoutThroughput));
-        }
-
-
-        if (collectSpoutLatency) {
-            double latency = this.curSample.getTotalLatency();
-            if (latency > this.maxLatency) {
-                this.maxLatency = latency;
-            }
-            metrics.put(SPOUT_AVG_COMPLETE_LATENCY,
-                    String.format(SPOUT_AVG_LATENCY_FORMAT, latency));
-            metrics.put(SPOUT_MAX_COMPLETE_LATENCY,
-                    String.format(SPOUT_MAX_LATENCY_FORMAT, this.maxLatency));
-
-        }
-    }
-
-
-    void writeHeader(PrintWriter writer) {
-        header.add(TIME);
-        if (collectTopologyStats) {
-            header.add(WORKERS);
-            header.add(TASKS);
-            header.add(EXECUTORS);
-        }
-
-        if (collectThroughput) {
-            header.add(TRANSFERRED);
-            header.add(XSFER_RATE);
-        }
-
-        if (collectSpoutThroughput) {
-            header.add(SPOUT_EXECUTORS);
-            header.add(SPOUT_TRANSFERRED);
-            header.add(SPOUT_ACKED);
-            header.add(SPOUT_THROUGHPUT);
-        }
-
-        if (collectSpoutLatency) {
-            header.add(SPOUT_AVG_COMPLETE_LATENCY);
-            header.add(SPOUT_MAX_COMPLETE_LATENCY);
-        }
-
-        writer.println("\n------------------------------------------------------------------------------------------------------------------");
-        String str = Utils.join(header, ",");
-        writer.println(str);
-        writer.println("------------------------------------------------------------------------------------------------------------------");
-        writer.flush();
-    }
-
-    void writeLine(PrintWriter writer) {
-        List<String> line = new LinkedList<String>();
-        for (String h : header) {
-            line.add(metrics.get(h));
-        }
-        String str = Utils.join(line, ",");
-        writer.println(str);
-        writer.flush();
-    }
-
-
-    boolean collectTopologyStats(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.TOPOLOGY_STATS);
-    }
-
-    boolean collectExecutorStats(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.XSFER_RATE) ||
-                items.contains(MetricsItem.SPOUT_LATENCY);
-    }
-
-    boolean collectThroughput(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.XSFER_RATE);
-    }
-
-    boolean collectSpoutThroughput(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.SPOUT_THROUGHPUT);
-    }
-
-    boolean collectSpoutLatency(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.SPOUT_LATENCY);
-    }
-
-
-
-    public static class MetricsCollectorConfig {
-        private static final Logger LOG = Logger.getLogger(MetricsCollectorConfig.class);
-
-        // storm configuration
-        public final Map stormConfig;
-        // storm topology name
-        public final String name;
-        // benchmark label
-        public final String label;
-
-        public MetricsCollectorConfig(String topoName, Map stormConfig) {
-            this.stormConfig = stormConfig;
-            String labelStr = (String) stormConfig.get("benchmark.label");
-            this.name = topoName;
-            if (labelStr == null) {
-                LOG.warn("'benchmark.label' not found in config. Defaulting to topology name");
-                labelStr = this.name;
-            }
-            this.label = labelStr;
-        }
-    } // MetricsCollectorConfig
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java b/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
deleted file mode 100755
index 8bcd84f..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.utils;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.KillOptions;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.utils.NimbusClient;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-
-
-public class Helper {
-
-  public static void kill(Nimbus.Client client, String topoName) throws Exception {
-    KillOptions opts = new KillOptions();
-    opts.set_wait_secs(0);
-    client.killTopologyWithOpts(topoName, opts);
-  }
-
-  public static void killAndShutdownCluster(LocalCluster cluster, String topoName) throws Exception {
-    KillOptions opts = new KillOptions();
-    opts.set_wait_secs(0);
-    cluster.killTopologyWithOpts(topoName, opts);
-    cluster.shutdown();
-  }
-
-
-    public static LocalCluster runOnLocalCluster(String topoName, StormTopology topology) throws Exception {
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(topoName, new Config(), topology);
-        return cluster;
-    }
-
-    public static int getInt(Map map, Object key, int def) {
-        return Utils.getInt(Utils.get(map, key, def));
-    }
-
-    public static String getStr(Map map, Object key) {
-        return (String) map.get(key);
-    }
-
-    public static void collectMetricsAndKill(String topologyName, Integer pollInterval, Integer duration) throws Exception {
-        Map clusterConf = Utils.readStormConfig();
-        Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
-        BasicMetricsCollector metricsCollector = new BasicMetricsCollector(client, topologyName, clusterConf);
-
-        int times = duration / pollInterval;
-        metricsCollector.collect(client);
-        for (int i = 0; i < times; i++) {
-            Thread.sleep(pollInterval * 1000);
-            metricsCollector.collect(client);
-        }
-        metricsCollector.close();
-        kill(client, topologyName);
-    }
-
-    public static void collectLocalMetricsAndKill(LocalCluster localCluster, String topologyName, Integer pollInterval, Integer duration, Map clusterConf) throws Exception {
-        BasicMetricsCollector metricsCollector = new BasicMetricsCollector(localCluster, topologyName, clusterConf);
-
-        int times = duration / pollInterval;
-        metricsCollector.collect(localCluster);
-        for (int i = 0; i < times; i++) {
-            Thread.sleep(pollInterval * 1000);
-            metricsCollector.collect(localCluster);
-        }
-        metricsCollector.close();
-        killAndShutdownCluster(localCluster, topologyName);
-    }
-
-    /** Kill topo and Shutdown local cluster on Ctrl-C */
-  public static void setupShutdownHook(final LocalCluster cluster, final String topoName) {
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      public void run() {
-          try {
-              cluster.killTopology(topoName);
-              System.out.println("Killed Topology");
-          } catch (Exception e) {
-              System.err.println("Encountered error in killing topology: " + e);
-          }
-        cluster.shutdown();
-      }
-    });
-  }
-
-  /** Kill topo on Ctrl-C */
-  public static void setupShutdownHook(final String topoName) {
-    Map clusterConf = Utils.readStormConfig();
-    final Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      public void run() {
-        try {
-          Helper.kill(client, topoName);
-          System.out.println("Killed Topology");
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-      }
-    });
-  }
-
-    public static void runOnClusterAndPrintMetrics(Integer durationSec, String topoName, Map topoConf, StormTopology topology) throws Exception {
-      // submit topology
-      StormSubmitter.submitTopologyWithProgressBar(topoName, topoConf, topology);
-      setupShutdownHook(topoName); // handle Ctrl-C
-
-      // poll metrics every minute, then kill topology after specified duration
-      Integer pollIntervalSec = 60;
-      collectMetricsAndKill(topoName, pollIntervalSec, durationSec);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
deleted file mode 100755
index 396ad53..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.utils;
-
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-
-public class IdentityBolt extends BaseRichBolt {
-    private OutputCollector collector;
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        collector.emit(tuple, tuple.getValues() );
-        collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-}
-