You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/14 14:07:42 UTC

[7/9] storm git commit: STORM-2306 - Messaging subsystem redesign. New Backpressure model.

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java
new file mode 100644
index 0000000..13488aa
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
+import org.slf4j.LoggerFactory;
+
+public class LowThroughputTopo {
+    private static final String SPOUT_ID = "ThrottledSpout";
+    private static final String BOLT_ID = "LatencyPrintBolt";
+    private static final Integer SPOUT_COUNT = 1;
+    private static final Integer BOLT_COUNT = 1;
+    private static final String SLEEP_MS = "sleep";
+
+    static StormTopology getTopology(Map<String, Object> conf) {
+
+        Long sleepMs = ObjectReader.getLong(conf.get(SLEEP_MS));
+        // 1 -  Setup Spout   --------
+        ThrottledSpout spout = new ThrottledSpout(sleepMs).withOutputFields(ThrottledSpout.DEFAULT_FIELD_NAME);
+
+        // 2 -  Setup DevNull Bolt   --------
+        LatencyPrintBolt bolt = new LatencyPrintBolt();
+
+
+        // 3 - Setup Topology  --------
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(SPOUT_ID, spout, Helper.getInt(conf, SPOUT_COUNT, 1));
+        BoltDeclarer bd = builder.setBolt(BOLT_ID, bolt, Helper.getInt(conf, BOLT_COUNT, 1));
+
+        bd.localOrShuffleGrouping(SPOUT_ID);
+//        bd.shuffleGrouping(SPOUT_ID);
+        return builder.createTopology();
+    }
+
+    public static void main(String[] args) throws Exception {
+        int runTime = -1;
+        Map<String, Object> topoConf = Utils.findAndReadConfigFile(args[1]);
+        topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 1);
+        if (args.length > 0) {
+            long sleepMs = Integer.parseInt(args[0]);
+            topoConf.put(SLEEP_MS, sleepMs);
+        }
+        if (args.length > 1) {
+            runTime = Integer.parseInt(args[1]);
+        }
+        if (args.length > 2) {
+            System.err.println("args: spoutSleepMs [runDurationSec] ");
+            return;
+        }
+        topoConf.putAll(Utils.readCommandLineOpts());
+        //  Submit topology to storm cluster
+        Helper.runOnClusterAndPrintMetrics(runTime, "LowThroughputTopo", topoConf, getTopology(topoConf));
+    }
+
+    private static class ThrottledSpout extends BaseRichSpout {
+
+        static final String DEFAULT_FIELD_NAME = "time";
+        private String fieldName = DEFAULT_FIELD_NAME;
+        private SpoutOutputCollector collector = null;
+        private long sleepTimeMs;
+
+        public ThrottledSpout(long sleepMs) {
+            this.sleepTimeMs = sleepMs;
+        }
+
+        public ThrottledSpout withOutputFields(String fieldName) {
+            this.fieldName = fieldName;
+            return this;
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields(fieldName));
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void nextTuple() {
+            Long now = System.currentTimeMillis();
+            List<Object> tuple = Collections.singletonList(now);
+            collector.emit(tuple, now);
+            Utils.sleep(sleepTimeMs);
+        }
+
+        @Override
+        public void ack(Object msgId) {
+            super.ack(msgId);
+        }
+    }
+
+    private static class LatencyPrintBolt extends BaseRichBolt {
+        private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LatencyPrintBolt.class);
+        private OutputCollector collector;
+
+        @Override
+        public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple tuple) {
+            Long now = System.currentTimeMillis();
+            Long then = (Long) tuple.getValues().get(0);
+            LOG.warn("Latency {} ", now - then);
+            System.err.println(now - then);
+            collector.ack(tuple);
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/SimplifiedWordCountTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/SimplifiedWordCountTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/SimplifiedWordCountTopo.java
new file mode 100644
index 0000000..6d368a0
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/SimplifiedWordCountTopo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.perf.bolt.CountBolt;
+import org.apache.storm.perf.spout.WordGenSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+public class SimplifiedWordCountTopo {
+
+    public static final String SPOUT_ID = "spout";
+    public static final String COUNT_ID = "counter";
+    public static final String TOPOLOGY_NAME = "SimplifiedWordCountTopo";
+
+    // Config settings
+    public static final String SPOUT_NUM = "spout.count";
+    public static final String BOLT_NUM = "counter.count";
+    public static final String INPUT_FILE = "input.file";
+
+    public static final int DEFAULT_SPOUT_NUM = 1;
+    public static final int DEFAULT_COUNT_BOLT_NUM = 1;
+
+
+    static StormTopology getTopology(Map config) {
+
+        final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+        final int cntBoltNum = Helper.getInt(config, BOLT_NUM, DEFAULT_COUNT_BOLT_NUM);
+        final String inputFile = Helper.getStr(config, INPUT_FILE);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(SPOUT_ID, new WordGenSpout(inputFile), spoutNum);
+        builder.setBolt(COUNT_ID, new CountBolt(), cntBoltNum).fieldsGrouping(SPOUT_ID, new Fields(WordGenSpout.FIELDS));
+
+        return builder.createTopology();
+    }
+
+    // Toplogy:  WorGenSpout -> FieldsGrouping -> CountBolt
+    public static void main(String[] args) throws Exception {
+        int runTime = -1;
+        Config topoConf = new Config();
+        if (args.length > 2) {
+            String file = args[0];
+            runTime = Integer.parseInt(args[1]);
+            topoConf.put(INPUT_FILE, file);
+            topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+        }
+        if (args.length > 3 || args.length == 0) {
+            System.err.println("args: file.txt [runDurationSec]  [optionalConfFile]");
+            return;
+        }
+        topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8);
+        topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
+        topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
+        topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
+        topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
+        topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
+
+        topoConf.putAll(Utils.readCommandLineOpts());
+        //  Submit topology to storm cluster
+        Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
index 61cf394..a3650c6 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
@@ -21,6 +21,7 @@ package org.apache.storm.perf;
 
 import java.util.Map;
 
+import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.hdfs.bolt.HdfsBolt;
 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
@@ -46,11 +47,11 @@ public class StrGenSpoutHdfsBoltTopo {
 
     // configs - topo parallelism
     public static final String SPOUT_NUM = "spout.count";
-    public static final String BOLT_NUM =  "bolt.count";
+    public static final String BOLT_NUM = "bolt.count";
 
     // configs - hdfs bolt
-    public static final String HDFS_URI   = "hdfs.uri";
-    public static final String HDFS_PATH  = "hdfs.dir";
+    public static final String HDFS_URI = "hdfs.uri";
+    public static final String HDFS_PATH = "hdfs.dir";
     public static final String HDFS_BATCH = "hdfs.batch";
 
     public static final int DEFAULT_SPOUT_NUM = 1;
@@ -63,7 +64,7 @@ public class StrGenSpoutHdfsBoltTopo {
     public static final String BOLT_ID = "hdfsBolt";
 
 
-    public static StormTopology getTopology(Map<String, Object> topoConf) {
+    static StormTopology getTopology(Map<String, Object> topoConf) {
         final int hdfsBatch = Helper.getInt(topoConf, HDFS_BATCH, DEFAULT_HDFS_BATCH);
 
         // 1 -  Setup StringGen Spout   --------
@@ -79,15 +80,15 @@ public class StrGenSpoutHdfsBoltTopo {
         final int boltNum = Helper.getInt(topoConf, BOLT_NUM, DEFAULT_BOLT_NUM);
 
         // Use default, Storm-generated file names
-        FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(Helper.getStr(topoConf, HDFS_PATH) );
+        FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(Helper.getStr(topoConf, HDFS_PATH));
 
         // Instantiate the HdfsBolt
         HdfsBolt bolt = new HdfsBolt()
-                .withFsUrl(Hdfs_url)
-                .withFileNameFormat(fileNameFormat)
-                .withRecordFormat(format)
-                .withRotationPolicy(rotationPolicy)
-                .withSyncPolicy(syncPolicy);
+            .withFsUrl(Hdfs_url)
+            .withFileNameFormat(fileNameFormat)
+            .withRecordFormat(format)
+            .withRotationPolicy(rotationPolicy)
+            .withSyncPolicy(syncPolicy);
 
 
         // 3 - Setup Topology  --------
@@ -95,13 +96,15 @@ public class StrGenSpoutHdfsBoltTopo {
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout(SPOUT_ID, spout, spoutNum);
         builder.setBolt(BOLT_ID, bolt, boltNum)
-                .localOrShuffleGrouping(SPOUT_ID);
+            .localOrShuffleGrouping(SPOUT_ID);
 
         return builder.createTopology();
     }
 
 
-    /** Spout generates random strings and HDFS bolt writes them to a text file */
+    /**
+     * Spout generates random strings and HDFS bolt writes them to a text file
+     */
     public static void main(String[] args) throws Exception {
         String confFile = "conf/HdfsSpoutTopo.yaml";
         int runTime = -1; //Run until Ctrl-C
@@ -120,7 +123,13 @@ public class StrGenSpoutHdfsBoltTopo {
         }
 
         Map<String, Object> topoConf = Utils.findAndReadConfigFile(confFile);
+        topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
+        topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
+        topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
+        topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
+        topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
 
+        topoConf.putAll(Utils.readCommandLineOpts());
         Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
     }
 
@@ -140,13 +149,13 @@ public class StrGenSpoutHdfsBoltTopo {
          * @param delimiter
          * @return
          */
-        public LineWriter withLineDelimiter(String delimiter){
+        public LineWriter withLineDelimiter(String delimiter) {
             this.lineDelimiter = delimiter;
             return this;
         }
 
         public byte[] format(Tuple tuple) {
-            return (tuple.getValueByField(fieldName).toString() +  this.lineDelimiter).getBytes();
+            return (tuple.getValueByField(fieldName).toString() + this.lineDelimiter).getBytes();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java
new file mode 100644
index 0000000..09aec7d
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+public class ThroughputMeter {
+
+    private String name;
+    private long startTime = 0;
+    private int count;
+    private long endTime = 0;
+
+    public ThroughputMeter(String name) {
+        this.name = name;
+        this.startTime = System.currentTimeMillis();
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void record() {
+        ++count;
+    }
+
+    public double stop() {
+        if (startTime == 0) {
+            return 0;
+        }
+        if (endTime == 0) {
+            this.endTime = System.currentTimeMillis();
+        }
+        return calcThroughput(count, startTime, endTime);
+    }
+
+    // Returns the recorded throughput since the last call to getCurrentThroughput()
+    //     or since this meter was instantiated if being called for fisrt time.
+    public double getCurrentThroughput() {
+        if (startTime == 0) {
+            return 0;
+        }
+        long currTime = (endTime == 0) ? System.currentTimeMillis() : endTime;
+
+        double result = calcThroughput(count, startTime, currTime) / 1000; // K/sec
+        startTime = currTime;
+        count = 0;
+        return result;
+    }
+
+    /**
+     * @return events/sec
+     */
+    private static double calcThroughput(long count, long startTime, long endTime) {
+        long gap = (endTime - startTime);
+        return (count / gap) * 1000;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
index 3286ac3..368699b 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
@@ -18,6 +18,8 @@
 
 package org.apache.storm.perf.bolt;
 
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.BasicOutputCollector;
@@ -27,8 +29,6 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 
-import java.util.HashMap;
-import java.util.Map;
 
 public class CountBolt extends BaseBasicBolt {
     public static final String FIELDS_WORD = "word";
@@ -44,8 +44,9 @@ public class CountBolt extends BaseBasicBolt {
     public void execute(Tuple tuple, BasicOutputCollector collector) {
         String word = tuple.getString(0);
         Integer count = counts.get(word);
-        if (count == null)
+        if (count == null) {
             count = 0;
+        }
         count++;
         counts.put(word, count);
         collector.emit(new Values(word, count));

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
index f9d045e..5f9c710 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
@@ -18,26 +18,37 @@
 
 package org.apache.storm.perf.bolt;
 
+import java.util.Map;
+import java.util.concurrent.locks.LockSupport;
+
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.tuple.Tuple;
-
-import java.util.Map;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.LoggerFactory;
 
 
 public class DevNullBolt extends BaseRichBolt {
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(DevNullBolt.class);
     private OutputCollector collector;
+    private Long sleepNanos;
+    private int eCount = 0;
 
     @Override
     public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
         this.collector = collector;
+        this.sleepNanos = ObjectReader.getLong(topoConf.get("nullbolt.sleep.micros"), 0L) * 1_000;
     }
 
     @Override
     public void execute(Tuple tuple) {
         collector.ack(tuple);
+        if (sleepNanos > 0) {
+            LockSupport.parkNanos(sleepNanos);
+        }
+        ++eCount;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
index 3735447..0644e31 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
@@ -18,6 +18,8 @@
 
 package org.apache.storm.perf.bolt;
 
+import java.util.Map;
+
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -26,8 +28,6 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 
-import java.util.Map;
-
 public class IdBolt extends BaseRichBolt {
     private OutputCollector collector;
 
@@ -38,7 +38,7 @@ public class IdBolt extends BaseRichBolt {
 
     @Override
     public void execute(Tuple tuple) {
-        collector.emit(tuple, new Values( tuple.getValues() ) );
+        collector.emit(tuple, new Values(tuple.getValues()));
         collector.ack(tuple);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
index f32628d..abb5af8 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
@@ -18,6 +18,8 @@
 
 package org.apache.storm.perf.bolt;
 
+import java.util.Map;
+
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.BasicOutputCollector;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -26,12 +28,17 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 
-import java.util.Map;
-
 
 public class SplitSentenceBolt extends BaseBasicBolt {
     public static final String FIELDS = "word";
 
+    public static String[] splitSentence(String sentence) {
+        if (sentence != null) {
+            return sentence.split("\\s+");
+        }
+        return null;
+    }
+
     @Override
     public void prepare(Map<String, Object> topoConf, TopologyContext context) {
     }
@@ -47,12 +54,4 @@ public class SplitSentenceBolt extends BaseBasicBolt {
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields(FIELDS));
     }
-
-
-    public static String[] splitSentence(String sentence) {
-        if (sentence != null) {
-            return sentence.split("\\s+");
-        }
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
index afd2ebc..46f12ab 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
@@ -18,16 +18,15 @@
 
 package org.apache.storm.perf.spout;
 
+import java.util.ArrayList;
+import java.util.Map;
 
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import org.apache.storm.utils.ObjectReader;
 
 public class ConstSpout extends BaseRichSpout {
 
@@ -35,7 +34,9 @@ public class ConstSpout extends BaseRichSpout {
     private String value;
     private String fieldName = DEFAUT_FIELD_NAME;
     private SpoutOutputCollector collector = null;
-    private int count=0;
+    private int count = 0;
+    private Long sleep = 0L;
+    private int ackCount = 0;
 
     public ConstSpout(String value) {
         this.value = value;
@@ -54,16 +55,26 @@ public class ConstSpout extends BaseRichSpout {
     @Override
     public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
         this.collector = collector;
+        this.sleep = ObjectReader.getLong(conf.get("spout.sleep"), 0L);
     }
 
     @Override
     public void nextTuple() {
-        List<Object> tuple = Collections.singletonList((Object) value);
+        ArrayList<Object> tuple = new ArrayList<Object>(1);
+        tuple.add(value);
         collector.emit(tuple, count++);
+        try {
+            if (sleep > 0) {
+                Thread.sleep(sleep);
+            }
+        } catch (InterruptedException e) {
+            return;
+        }
     }
 
     @Override
     public void ack(Object msgId) {
+        ++ackCount;
         super.ack(msgId);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
index 4815a02..4f27d3b 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
@@ -18,13 +18,6 @@
 
 package org.apache.storm.perf.spout;
 
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
 import java.io.BufferedReader;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -35,6 +28,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
 public class FileReadSpout extends BaseRichSpout {
     public static final String FIELDS = "sentence";
     private static final long serialVersionUID = -2582705611472467172L;
@@ -55,6 +55,26 @@ public class FileReadSpout extends BaseRichSpout {
         this.reader = reader;
     }
 
+    public static List<String> readLines(InputStream input) {
+        List<String> lines = new ArrayList<>();
+        try {
+            BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+            try {
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    lines.add(line);
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Reading file failed", e);
+            } finally {
+                reader.close();
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Error closing reader", e);
+        }
+        return lines;
+    }
+
     @Override
     public void open(Map<String, Object> conf, TopologyContext context,
                      SpoutOutputCollector collector) {
@@ -84,26 +104,6 @@ public class FileReadSpout extends BaseRichSpout {
         declarer.declare(new Fields(FIELDS));
     }
 
-    public static List<String> readLines(InputStream input) {
-        List<String> lines = new ArrayList<>();
-        try {
-            BufferedReader reader = new BufferedReader(new InputStreamReader(input));
-            try {
-                String line;
-                while ((line = reader.readLine()) != null) {
-                    lines.add(line);
-                }
-            } catch (IOException e) {
-                throw new RuntimeException("Reading file failed", e);
-            } finally {
-                reader.close();
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("Error closing reader", e);
-        }
-        return lines;
-    }
-
     public static class FileReader implements Serializable {
 
         private static final long serialVersionUID = -7012334600647556267L;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
index 6adb2e3..530f7b6 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
@@ -18,6 +18,10 @@
 
 package org.apache.storm.perf.spout;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -26,11 +30,6 @@ import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 /** Spout pre-computes a list with 30k fixed length random strings.
  *  Emits sequentially from this list, over and over again.
  */
@@ -43,8 +42,8 @@ public class StringGenSpout extends BaseRichSpout {
     private String fieldName = DEFAULT_FIELD_NAME;
     private SpoutOutputCollector collector = null;
     ArrayList<String> records;
-    private int curr=0;
-    private int count=0;
+    private int curr = 0;
+    private int count = 0;
 
     public StringGenSpout(int strLen) {
         this.strLen = strLen;
@@ -57,7 +56,7 @@ public class StringGenSpout extends BaseRichSpout {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare( new Fields(fieldName) );
+        declarer.declare(new Fields(fieldName));
     }
 
     @Override
@@ -78,7 +77,7 @@ public class StringGenSpout extends BaseRichSpout {
     @Override
     public void nextTuple() {
         List<Object> tuple;
-        if( curr < strCount ) {
+        if(curr < strCount) {
             tuple = Collections.singletonList((Object) records.get(curr));
             ++curr;
             collector.emit(tuple, ++count);

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java
new file mode 100644
index 0000000..92a2f53
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.spout;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.perf.ThroughputMeter;
+
+public class WordGenSpout extends BaseRichSpout {
+    public static final String FIELDS = "word";
+    private static final long serialVersionUID = -2582705611472467172L;
+    private String file;
+    private boolean ackEnabled = true;
+    private SpoutOutputCollector collector;
+
+    private long count = 0;
+    private int index = 0;
+    private ThroughputMeter emitMeter;
+    private ArrayList<String> words;
+
+
+    public WordGenSpout(String file) {
+        this.file = file;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+        Integer ackers = Helper.getInt(conf, "topology.acker.executors", 0);
+        if (ackers.equals(0)) {
+            this.ackEnabled = false;
+        }
+        // for tests, reader will not be null
+        words = readWords(file);
+        emitMeter = new ThroughputMeter("WordGenSpout emits");
+    }
+
+    @Override
+    public void nextTuple() {
+        index = (index < words.size()-1) ? index+1 : 0;
+        String word = words.get(index);
+        if (ackEnabled) {
+            collector.emit(new Values(word), count);
+            count++;
+        } else {
+            collector.emit(new Values(word));
+        }
+        emitMeter.record();
+
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(FIELDS));
+    }
+
+    // reads text file and extracts words from each line. returns list of all (non-unique) words
+    public static ArrayList<String> readWords(String file)  {
+        ArrayList<String> lines = new ArrayList<>();
+        try {
+            FileInputStream input = new FileInputStream(file);
+            BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+            try {
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    for (String word : line.split("\\s+"))
+                        lines.add(word);
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Reading file failed", e);
+            } finally {
+                reader.close();
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Error closing reader", e);
+        }
+        return lines;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
index 97c1aa9..7cfd354 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
@@ -18,27 +18,20 @@
 
 package org.apache.storm.perf.utils;
 
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.utils.Utils;
-import org.apache.log4j.Logger;
-
 import java.io.PrintWriter;
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+import org.apache.log4j.Logger;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.utils.Utils;
 
 public class BasicMetricsCollector implements AutoCloseable {
-    private PrintWriter dataWriter;
-    private long startTime=0;
-
-    public enum MetricsItem {
-        TOPOLOGY_STATS,
-        XSFER_RATE,
-        SPOUT_THROUGHPUT,
-        SPOUT_LATENCY,
-        ALL
-    }
-
-
     /* headers */
     public static final String TIME = "elapsed (sec)";
     public static final String TIME_FORMAT = "%d";
@@ -54,29 +47,27 @@ public class BasicMetricsCollector implements AutoCloseable {
     public static final String SPOUT_ACKED = "spout_acks";
     public static final String SPOUT_THROUGHPUT = "spout_throughput (acks/s)";
     public static final String SPOUT_AVG_COMPLETE_LATENCY = "spout_avg_complete_latency(ms)";
-    public static final String SPOUT_AVG_LATENCY_FORMAT = "%.1f";
+    public static final String SPOUT_AVG_LATENCY_FORMAT = "%.3f";
     public static final String SPOUT_MAX_COMPLETE_LATENCY = "spout_max_complete_latency(ms)";
-    public static final String SPOUT_MAX_LATENCY_FORMAT = "%.1f";
+    public static final String SPOUT_MAX_LATENCY_FORMAT = "%.3f";
     private static final Logger LOG = Logger.getLogger(BasicMetricsCollector.class);
     final MetricsCollectorConfig config;
     //    final StormTopology topology;
     final Set<String> header = new LinkedHashSet<String>();
     final Map<String, String> metrics = new HashMap<String, String>();
-    int lineNumber = 0;
-
     final boolean collectTopologyStats;
     final boolean collectExecutorStats;
     final boolean collectThroughput;
-
     final boolean collectSpoutThroughput;
     final boolean collectSpoutLatency;
-
+    int lineNumber = 0;
+    boolean first = true;
+    private PrintWriter dataWriter;
+    private long startTime = 0;
     private MetricsSample lastSample;
     private MetricsSample curSample;
     private double maxLatency = 0;
 
-    boolean first = true;
-    
     public BasicMetricsCollector(String topoName, Map<String, Object> topoConfig) {
         Set<MetricsItem> items = getMetricsToCollect();
         this.config = new MetricsCollectorConfig(topoName, topoConfig);
@@ -88,7 +79,7 @@ public class BasicMetricsCollector implements AutoCloseable {
         dataWriter = new PrintWriter(System.err);
     }
 
-    private Set<MetricsItem>  getMetricsToCollect() {
+    private Set<MetricsItem> getMetricsToCollect() {
         Set<MetricsItem> result = new HashSet<>();
         result.add(MetricsItem.ALL);
         return result;
@@ -119,7 +110,7 @@ public class BasicMetricsCollector implements AutoCloseable {
     }
 
     boolean updateStats(PrintWriter writer)
-            throws Exception {
+        throws Exception {
         if (collectTopologyStats) {
             updateTopologyStats();
         }
@@ -169,14 +160,13 @@ public class BasicMetricsCollector implements AutoCloseable {
                 this.maxLatency = latency;
             }
             metrics.put(SPOUT_AVG_COMPLETE_LATENCY,
-                    String.format(SPOUT_AVG_LATENCY_FORMAT, latency));
+                String.format(SPOUT_AVG_LATENCY_FORMAT, latency));
             metrics.put(SPOUT_MAX_COMPLETE_LATENCY,
-                    String.format(SPOUT_MAX_LATENCY_FORMAT, this.maxLatency));
+                String.format(SPOUT_MAX_LATENCY_FORMAT, this.maxLatency));
 
         }
     }
 
-
     void writeHeader(PrintWriter writer) {
         header.add(TIME);
         if (collectTopologyStats) {
@@ -219,34 +209,39 @@ public class BasicMetricsCollector implements AutoCloseable {
         writer.flush();
     }
 
-
     boolean collectTopologyStats(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.TOPOLOGY_STATS);
+        return items.contains(MetricsItem.ALL)
+            || items.contains(MetricsItem.TOPOLOGY_STATS);
     }
 
     boolean collectExecutorStats(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.XSFER_RATE) ||
-                items.contains(MetricsItem.SPOUT_LATENCY);
+        return items.contains(MetricsItem.ALL)
+            || items.contains(MetricsItem.XSFER_RATE)
+            || items.contains(MetricsItem.SPOUT_LATENCY);
     }
 
     boolean collectThroughput(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.XSFER_RATE);
+        return items.contains(MetricsItem.ALL)
+            || items.contains(MetricsItem.XSFER_RATE);
     }
 
     boolean collectSpoutThroughput(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.SPOUT_THROUGHPUT);
+        return items.contains(MetricsItem.ALL)
+            || items.contains(MetricsItem.SPOUT_THROUGHPUT);
     }
 
     boolean collectSpoutLatency(Set<MetricsItem> items) {
-        return items.contains(MetricsItem.ALL) ||
-                items.contains(MetricsItem.SPOUT_LATENCY);
+        return items.contains(MetricsItem.ALL)
+            || items.contains(MetricsItem.SPOUT_LATENCY);
     }
 
-
+    public enum MetricsItem {
+        TOPOLOGY_STATS,
+        XSFER_RATE,
+        SPOUT_THROUGHPUT,
+        SPOUT_LATENCY,
+        ALL
+    }
 
     public static class MetricsCollectorConfig {
         private static final Logger LOG = Logger.getLogger(MetricsCollectorConfig.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
index e34cb6e..73bfdda 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
@@ -75,6 +75,7 @@ public class Helper {
         Runtime.getRuntime().addShutdownHook(new Thread() {
             public void run() {
                 try {
+                    System.out.println("Killing...");
                     Helper.kill(client, topoName);
                     System.out.println("Killed Topology");
                 } catch (Exception e) {
@@ -84,7 +85,8 @@ public class Helper {
         });
     }
 
-    public static void runOnClusterAndPrintMetrics(int durationSec, String topoName, Map<String, Object> topoConf, StormTopology topology) throws Exception {
+    public static void runOnClusterAndPrintMetrics(int durationSec, String topoName, Map<String, Object> topoConf, StormTopology topology)
+             throws Exception {
         // submit topology
         StormSubmitter.submitTopologyWithProgressBar(topoName, topoConf, topology);
         setupShutdownHook(topoName); // handle Ctrl-C

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
index 54f8ee1..f950203 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
@@ -18,16 +18,13 @@
 
 package org.apache.storm.perf.utils;
 
+import java.util.Map;
 
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
 
 public class IdentityBolt extends BaseRichBolt {
     private OutputCollector collector;
@@ -45,7 +42,6 @@ public class IdentityBolt extends BaseRichBolt {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
     }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
index f1177b6..f462bc7 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
@@ -33,16 +33,16 @@ import org.apache.storm.utils.Utils;
 
 public class MetricsSample {
 
-    private long sampleTime = -1;
-    private long totalTransferred = 0l;
-    private long totalEmitted = 0l;
-    private long totalAcked = 0l;
-    private long totalFailed = 0l;
+    private long sampleTime = -1L;
+    private long totalTransferred = 0L;
+    private long totalEmitted = 0L;
+    private long totalAcked = 0L;
+    private long totalFailed = 0L;
 
     private double totalLatency;
 
-    private long spoutEmitted = 0l;
-    private long spoutTransferred = 0l;
+    private long spoutEmitted = 0L;
+    private long spoutTransferred = 0L;
     private int spoutExecutors = 0;
 
     private int numSupervisors = 0;
@@ -64,7 +64,7 @@ public class MetricsSample {
         int topologyTasks = topSummary.get_num_tasks();
         TopologyInfo topInfo = client.getTopologyInfo(topSummary.get_id());
 
-        MetricsSample sample =  getMetricsSample( topInfo);
+        MetricsSample sample = getMetricsSample(topInfo);
         sample.numWorkers = topologyWorkers;
         sample.numExecutors = topologyExecutors;
         sample.numTasks = topologyTasks;
@@ -75,62 +75,62 @@ public class MetricsSample {
         List<ExecutorSummary> executorSummaries = topInfo.get_executors();
 
         // totals
-        long totalTransferred = 0l;
-        long totalEmitted = 0l;
-        long totalAcked = 0l;
-        long totalFailed = 0l;
+        long totalTransferred = 0L;
+        long totalEmitted = 0L;
+        long totalAcked = 0L;
+        long totalFailed = 0L;
 
         // number of spout executors
         int spoutExecCount = 0;
         double spoutLatencySum = 0.0;
 
-        long spoutEmitted = 0l;
-        long spoutTransferred = 0l;
+        long spoutEmitted = 0L;
+        long spoutTransferred = 0L;
 
         // Executor summaries
-        for(ExecutorSummary executorSummary : executorSummaries){
+        for (ExecutorSummary executorSummary : executorSummaries) {
             ExecutorStats executorStats = executorSummary.get_stats();
-            if(executorStats == null){
+            if (executorStats == null) {
                 continue;
             }
 
             ExecutorSpecificStats executorSpecificStats = executorStats.get_specific();
-            if(executorSpecificStats == null){
+            if (executorSpecificStats == null) {
                 // bail out
                 continue;
             }
 
             // transferred totals
-            Map<String,Map<String,Long>> transferred = executorStats.get_transferred();
+            Map<String, Map<String, Long>> transferred = executorStats.get_transferred();
             Map<String, Long> txMap = transferred.get(":all-time");
-            if(txMap == null){
+            if (txMap == null) {
                 continue;
             }
-            for(String key : txMap.keySet()){
+            for (String key : txMap.keySet()) {
                 // todo, ignore the master batch coordinator ?
-                if(!Utils.isSystemId(key)){
+                if (!Utils.isSystemId(key)) {
                     Long count = txMap.get(key);
                     totalTransferred += count;
-                    if(executorSpecificStats.is_set_spout()){
+                    if (executorSpecificStats.is_set_spout()) {
                         spoutTransferred += count;
                     }
                 }
             }
 
             // we found a spout
-            if(executorSpecificStats.isSet(2)) { // spout
+            if (executorSpecificStats.isSet(2)) { // spout
 
                 SpoutStats spoutStats = executorSpecificStats.get_spout();
                 Map<String, Long> acked = spoutStats.get_acked().get(":all-time");
-                if(acked != null){
-                    for(String key : acked.keySet()) {
+                if (acked != null) {
+                    for (String key : acked.keySet()) {
                         totalAcked += acked.get(key);
                     }
                 }
 
                 Map<String, Long> failed = spoutStats.get_failed().get(":all-time");
-                if(failed != null){
-                    for(String key : failed.keySet()) {
+                if (failed != null) {
+                    for (String key : failed.keySet()) {
                         totalFailed += failed.get(key);
                     }
                 }
@@ -154,9 +154,9 @@ public class MetricsSample {
         MetricsSample ret = new MetricsSample();
         ret.totalEmitted = totalEmitted;
         ret.totalTransferred = totalTransferred;
-        ret.totalAcked  = totalAcked;
+        ret.totalAcked = totalAcked;
         ret.totalFailed = totalFailed;
-        ret.totalLatency = spoutLatencySum/spoutExecCount;
+        ret.totalLatency = spoutLatencySum / spoutExecCount;
         ret.spoutEmitted = spoutEmitted;
         ret.spoutTransferred = spoutTransferred;
         ret.sampleTime = System.currentTimeMillis();
@@ -178,7 +178,6 @@ public class MetricsSample {
     }
 
 
-
     // getters
     public long getSampleTime() {
         return sampleTime;
@@ -228,7 +227,7 @@ public class MetricsSample {
         return totalSlots;
     }
 
-    public int getSpoutExecutors(){
+    public int getSpoutExecutors() {
         return this.spoutExecutors;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
index 7d8c51f..6712768 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -69,7 +69,7 @@ public class EsTestUtil {
                 return new Fields("source", "index", "type", "id");
             }
         };
-        return new TupleImpl(topologyContext, new Values(source, index, type, id), 1, "");
+        return new TupleImpl(topologyContext, new Values(source, index, type, id), source, 1, "");
     }
 
     public static TridentTuple generateTestTridentTuple(String source, String index, String type, String id) {

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
index 88bafd2..ac724e8 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -56,6 +56,11 @@ public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
   }
 
   @Override
+  public void flush() {
+    // NO-OP
+  }
+
+  @Override
   public void reportError(Throwable arg0) {
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
index 3fe5de0..c42f769 100755
--- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
+++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
@@ -39,6 +39,11 @@ public class TridentCollectorMock implements TridentCollector {
   }
 
   @Override
+  public void flush() {
+    // NO-OP
+  }
+
+  @Override
   public void reportError(Throwable arg0) {
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
index 0f6bd28..cac55c9 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java
@@ -229,7 +229,7 @@ public class AvroGenericRecordBoltTest {
                 return new Fields("record");
             }
         };
-        return new TupleImpl(topologyContext, new Values(record), 1, "");
+        return new TupleImpl(topologyContext, new Values(record), topologyContext.getComponentId(1), 1, "");
     }
 
     private void verifyAllAvroFiles(String path) throws IOException {

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
index de7316f..b872029 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
@@ -251,7 +251,7 @@ public class TestHdfsBolt {
                 return new Fields("id", "msg","city","state");
             }
         };
-        return new TupleImpl(topologyContext, new Values(id, msg,city,state), 1, "");
+        return new TupleImpl(topologyContext, new Values(id, msg,city,state), topologyContext.getComponentId(1), 1, "");
     }
 
     // Generally used to compare how files were actually written and compare to expectations based on total

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
index f297ba9..71f5d4e 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java
@@ -167,7 +167,7 @@ public class TestSequenceFileBolt {
                 return new Fields("key", "value");
             }
         };
-        return new TupleImpl(topologyContext, new Values(key, value), 1, "");
+        return new TupleImpl(topologyContext, new Values(key, value), topologyContext.getComponentId(1), 1, "");
     }
 
     // Generally used to compare how files were actually written and compare to expectations based on total

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
index 051e4b7..64f5dd8 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
@@ -31,8 +31,9 @@ public class TestSimpleFileNameFormat {
 
     @Test
     public void testDefaults() {
+        Map<String, Object> topoConf = new HashMap();
         SimpleFileNameFormat format = new SimpleFileNameFormat();
-        format.prepare(null, createTopologyContext());
+        format.prepare(null, createTopologyContext(topoConf));
         long now = System.currentTimeMillis();
         String path = format.getPath();
         String name = format.getName(1, now);
@@ -48,7 +49,8 @@ public class TestSimpleFileNameFormat {
             .withName("$TIME.$HOST.$COMPONENT.$TASK.$NUM.txt")
             .withPath("/mypath")
             .withTimeFormat("yyyy-MM-dd HH:mm:ss");
-        format.prepare(null, createTopologyContext());
+        Map<String, Object> topoConf = new HashMap();
+        format.prepare(null, createTopologyContext(topoConf));
         long now = System.currentTimeMillis();
         String path = format.getPath();
         String name = format.getName(1, now);
@@ -66,14 +68,15 @@ public class TestSimpleFileNameFormat {
 
     @Test(expected=IllegalArgumentException.class)
     public void testTimeFormat() {
+        Map<String, Object> topoConf = new HashMap();
         SimpleFileNameFormat format = new SimpleFileNameFormat()
         	.withTimeFormat("xyz");
-        format.prepare(null, createTopologyContext());
+        format.prepare(null, createTopologyContext(topoConf));
     }
     
-    private TopologyContext createTopologyContext(){
+    private TopologyContext createTopologyContext(Map<String, Object> topoConf){
     	Map<Integer, String> taskToComponent = new HashMap<Integer, String>();
         taskToComponent.put(7, "Xcom");
-    	return new TopologyContext(null, null, taskToComponent, null, null, null, null, null, null, 7, 6703, null, null, null, null, null, null);
+    	return new TopologyContext(null, topoConf, taskToComponent, null, null, null, null, null, null, 7, 6703, null, null, null, null, null, null);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 6e2b140..b3909f3 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -589,9 +589,9 @@ public class TestHdfsSpout {
     }
 
     private Map getCommonConfigs() {
-        Map<String, Object> conf = new HashMap();
-        conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "0");
-        return conf;
+        Map<String, Object> topoConf = new HashMap();
+        topoConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "0");
+        return topoConf;
     }
 
     private AutoCloseableHdfsSpout makeSpout(String readerType, String[] outputFields) {
@@ -619,9 +619,9 @@ public class TestHdfsSpout {
         }
     }
 
-    private void openSpout(HdfsSpout spout, int spoutId, Map<String, Object> conf) {
+    private void openSpout(HdfsSpout spout, int spoutId, Map<String, Object> topoConf) {
         MockCollector collector = new MockCollector();
-        spout.open(conf, new MockTopologyContext(spoutId), collector);
+        spout.open(topoConf, new MockTopologyContext(spoutId, topoConf), collector);
     }
 
     /**
@@ -698,13 +698,18 @@ public class TestHdfsSpout {
         }
 
         @Override
-        public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+        public List<Integer> emit(List<Object> tuple, Object messageId) {
             lines.add(tuple.toString());
             items.add(HdfsUtils.Pair.of(messageId, tuple));
             return null;
         }
 
         @Override
+        public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+            return emit(tuple, messageId);
+        }
+
+        @Override
         public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) {
             throw new UnsupportedOperationException("NOT Implemented");
         }
@@ -747,9 +752,9 @@ public class TestHdfsSpout {
 
         private final int componentId;
 
-        public MockTopologyContext(int componentId) {
+        public MockTopologyContext(int componentId, Map<String, Object> topoConf) {
             // StormTopology topology, Map<String, Object> topoConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources, Map<String, Object> executorData, Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics, Atom openOrPrepareWasCalled
-            super(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
+            super(null, topoConf, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
             this.componentId = componentId;
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
index f7cd046..dfa62ee 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -464,7 +464,7 @@ public class TestHiveBolt {
                     return new Fields("id", "msg","city","state");
                 }
             };
-        return new TupleImpl(topologyContext, new Values(id, msg,city,state), 1, "");
+        return new TupleImpl(topologyContext, new Values(id, msg,city,state), topologyContext.getComponentId(1), 1, "");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
index a53033f..345be58 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
@@ -168,7 +168,7 @@ public class TestHiveWriter {
                     return new Fields("id", "msg");
                 }
             };
-        return new TupleImpl(topologyContext, new Values(id, msg), 1, "");
+        return new TupleImpl(topologyContext, new Values(id, msg), topologyContext.getComponentId(1), 1, "");
     }
 
     private void writeTuples(HiveWriter writer, HiveMapper mapper, int count)

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
index a5a6c51..4e05646 100644
--- a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
+++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
@@ -37,6 +37,11 @@ public class MockSpoutOutputCollector implements ISpoutOutputCollector {
     }
 
     @Override
+    public void flush() {
+        //NO-OP
+    }
+
+    @Override
     public void reportError(Throwable error) {
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index abd6774..39bdb93 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -440,7 +440,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         } else {
             final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
             if (isAtLeastOnceProcessing()
-                && committedOffset != null 
+                && committedOffset != null
                 && committedOffset.offset() > record.offset()
                 && commitMetadataManager.isOffsetCommittedByThisTopology(tp,
                 committedOffset,

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java
index 888ecde..805913d 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java
@@ -238,6 +238,10 @@ public class PartitionManagerTest {
         public long getPendingCount() {
             throw new UnsupportedOperationException();
         }
+
+        @Override
+        public void flush() {
+        }
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
index 5f3753d..ed26157 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -296,7 +296,7 @@ public class KafkaBoltTest {
                 return new Fields("key", "message");
             }
         };
-        return new TupleImpl(topologyContext, new Values(key, message), 1, "");
+        return new TupleImpl(topologyContext, new Values(key, message), topologyContext.getComponentId(1), 1, "");
     }
 
     private Tuple generateTestTuple(Object message) {
@@ -307,7 +307,7 @@ public class KafkaBoltTest {
                 return new Fields("message");
             }
         };
-        return new TupleImpl(topologyContext, new Values(message), 1, "");
+        return new TupleImpl(topologyContext, new Values(message), topologyContext.getComponentId(1), 1, "");
     }
 
     private Tuple mockTickTuple() {

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cb89376..2260d66 100644
--- a/pom.xml
+++ b/pom.xml
@@ -259,6 +259,7 @@
         <snakeyaml.version>1.11</snakeyaml.version>
         <httpclient.version>4.3.3</httpclient.version>
         <clojure.tools.cli.version>0.2.4</clojure.tools.cli.version>
+        <jctools.version>2.0.1</jctools.version>
         <disruptor.version>3.3.2</disruptor.version>
         <jgrapht.version>0.9.0</jgrapht.version>
         <guava.version>16.0.1</guava.version>
@@ -876,6 +877,11 @@
                 <version>${clojure.tools.cli.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.jctools</groupId>
+                <artifactId>jctools-core</artifactId>
+                <version>${jctools.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>com.lmax</groupId>
                 <artifactId>disruptor</artifactId>
                 <version>${disruptor.version}</version>
@@ -1094,7 +1100,7 @@
                         <includes>
                             <include>${java.unit.test.include}</include>
                         </includes>
-                        <argLine>-Xmx1536m</argLine>
+                        <argLine>-Xmx3g</argLine>
                         <trimStackTrace>false</trimStackTrace>
                         <forkCount>1.0C</forkCount>
                         <reuseForks>true</reuseForks>

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/pom.xml
----------------------------------------------------------------------
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index e0c9609..2e073a8 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -110,10 +110,10 @@
             <artifactId>commons-collections</artifactId>
         </dependency>
 
-        <!-- disruptor -->
+        <!-- jctools -->
         <dependency>
-            <groupId>com.lmax</groupId>
-            <artifactId>disruptor</artifactId>
+            <groupId>org.jctools</groupId>
+            <artifactId>jctools-core</artifactId>
         </dependency>
         
         <!-- JAXB -->

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 5e9ea48..84825cf 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -15,10 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm;
 
 import java.util.Arrays;
 import org.apache.storm.metric.IEventLogger;
+import org.apache.storm.policy.IWaitStrategy;
 import org.apache.storm.serialization.IKryoDecorator;
 import org.apache.storm.serialization.IKryoFactory;
 import org.apache.storm.validation.ConfigValidation;
@@ -120,53 +122,14 @@ public class Config extends HashMap<String, Object> {
     public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs";
 
     /**
-     * How often to poll for changed topology backpressure flag from ZK
-     */
-    @isInteger
-    @isPositiveNumber
-    public static final String TASK_BACKPRESSURE_POLL_SECS = "task.backpressure.poll.secs";
-
-    /**
-     * Whether to enable backpressure in for a certain topology
+     * Whether to enable backpressure in for a certain topology.
+     * @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.
      */
+    @Deprecated
     @isBoolean
     public static final String TOPOLOGY_BACKPRESSURE_ENABLE = "topology.backpressure.enable";
 
     /**
-     * This signifies the tuple congestion in a disruptor queue.
-     * When the used ratio of a disruptor queue is higher than the high watermark,
-     * the backpressure scheme, if enabled, should slow down the tuple sending speed of
-     * the spouts until reaching the low watermark.
-     */
-    @isPositiveNumber
-    public static final String BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK="backpressure.disruptor.high.watermark";
-
-    /**
-     * This signifies a state that a disruptor queue has left the congestion.
-     * If the used ratio of a disruptor queue is lower than the low watermark,
-     * it will unset the backpressure flag.
-     */
-    @isPositiveNumber
-    public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
-
-    /**
-     * How long until the backpressure znode is invalid.
-     * It's measured by the data (timestamp) of the znode, not the ctime (creation time) or mtime (modification time), etc.
-     * This must be larger than BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS.
-     */
-    @isInteger
-    @isPositiveNumber
-    public static final String BACKPRESSURE_ZNODE_TIMEOUT_SECS = "backpressure.znode.timeout.secs";
-
-    /**
-     * How often will the data (timestamp) of backpressure znode be updated.
-     * But if the worker backpressure status (on/off) changes, the znode will be updated anyway.
-     */
-    @isInteger
-    @isPositiveNumber
-    public static final String BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS = "backpressure.znode.update.freq.secs";
-
-    /**
      * A list of users that are allowed to interact with the topology.  To use this set
      * nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
@@ -630,16 +593,6 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_ENVIRONMENT="topology.environment";
 
     /*
-     * Topology-specific option to disable/enable bolt's outgoing overflow buffer.
-     * Enabling this option ensures that the bolt can always clear the incoming messages,
-     * preventing live-lock for the topology with cyclic flow.
-     * The overflow buffer can fill degrading the performance gradually,
-     * eventually running out of memory.
-     */
-    @isBoolean
-    public static final String TOPOLOGY_BOLTS_OUTGOING_OVERFLOW_BUFFER_ENABLE="topology.bolts.outgoing.overflow.buffer.enable";
-
-    /*
      * Bolt-specific configuration for windowed bolts to specify the window length as a count of number of tuples
      * in the window.
      */
@@ -719,23 +672,25 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks";
 
     /**
-     * The size of the Disruptor receive queue for each executor. Must be a power of 2.
+     * The size of the receive queue for each executor.
      */
-    @isPowerOf2
+    @isPositiveNumber
+    @isInteger
     public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size";
 
     /**
-     * The size of the Disruptor send queue for each executor. Must be a power of 2.
+     * The size of the transfer queue for each worker.
      */
-    @isPowerOf2
-    public static final String TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE="topology.executor.send.buffer.size";
+    @isPositiveNumber
+    @isInteger
+    public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size";
 
     /**
-     * The size of the Disruptor transfer queue for each worker.
+     * The size of the transfer queue for each worker.
      */
+    @isPositiveNumber
     @isInteger
-    @isPowerOf2
-    public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size";
+    public static final String TOPOLOGY_TRANSFER_BATCH_SIZE="topology.transfer.batch.size";
 
     /**
      * How often a tick tuple from the "__system" component and "__tick" stream should be sent
@@ -745,13 +700,40 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS="topology.tick.tuple.freq.secs";
 
     /**
-     * @deprecated this is no longer supported
-     * Configure the wait strategy used for internal queuing. Can be used to tradeoff latency
-     * vs. throughput
+     * The number of tuples to batch before sending to the destination executor.
      */
-    @Deprecated
-    @isString
-    public static final String TOPOLOGY_DISRUPTOR_WAIT_STRATEGY="topology.disruptor.wait.strategy";
+    @isInteger
+    @isPositiveNumber
+    @NotNull
+    public static final String TOPOLOGY_PRODUCER_BATCH_SIZE="topology.producer.batch.size";
+
+    /**
+     * If number of items in task's overflowQ exceeds this, new messages coming from other workers to this task will be dropped
+     * This prevents OutOfMemoryException that can occur in rare scenarios in the presence of BackPressure. This affects
+     * only inter-worker messages. Messages originating from within the same worker will not be dropped.
+     */
+    @isInteger
+    @isPositiveNumber(includeZero = true)
+    @NotNull
+    public static final String TOPOLOGY_EXECUTOR_OVERFLOW_LIMIT="topology.executor.overflow.limit";
+
+    /**
+     * How often a worker should check and notify upstream workers about its tasks that are no longer experiencing BP
+     * and able to receive new messages
+     */
+    @isInteger
+    @isPositiveNumber
+    @NotNull
+    public static final String TOPOLOGY_BACKPRESSURE_CHECK_MILLIS ="topology.backpressure.check.millis";
+
+    /**
+     * How often to send flush tuple to the executors for flushing out batched events.
+     */
+    @isInteger
+    @isPositiveNumber(includeZero = true)
+    @NotNull
+    public static final String TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS ="topology.batch.flush.interval.millis";
+
 
     /**
      * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
@@ -890,30 +872,93 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
 
     /**
-     * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used to tradeoff latency
-     * vs. CPU usage
+     * Selects the Bolt's Wait Strategy to use when there are no incoming msgs. Used to trade off latency vs CPU usage.
+     */
+    @isString
+    @isDerivedFrom(baseType = IWaitStrategy.class)
+    public static final String TOPOLOGY_BOLT_WAIT_STRATEGY = "topology.bolt.wait.strategy";
+
+    /**
+     * Configures park time for WaitStrategyPark.  If set to 0, returns immediately (i.e busy wait).
      */
-    @isInteger
     @NotNull
-    public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
+    @isPositiveNumber(includeZero = true)
+    public static final String TOPOLOGY_BOLT_WAIT_PARK_MICROSEC = "topology.bolt.wait.park.microsec";
 
     /**
-     * The number of tuples to batch before sending to the next thread.  This number is just an initial suggestion and
-     * the code may adjust it as your topology runs.
+     * Configures number of iterations to spend in level 1 of WaitStrategyProgressive, before progressing to level 2
      */
+    @NotNull
     @isInteger
     @isPositiveNumber
+    public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT =  "topology.bolt.wait.progressive.level1.count";
+
+    /**
+     * Configures number of iterations to spend in level 2 of WaitStrategyProgressive, before progressing to level 3
+     */
     @NotNull
-    public static final String TOPOLOGY_DISRUPTOR_BATCH_SIZE="topology.disruptor.batch.size";
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT =  "topology.bolt.wait.progressive.level2.count";
 
     /**
-     * The maximum age in milliseconds a batch can be before being sent to the next thread.  This number is just an
-     * initial suggestion and the code may adjust it as your topology runs.
+     * Configures sleep time for WaitStrategyProgressive.
      */
+    @NotNull
+    @isPositiveNumber(includeZero = true)
+    public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS = "topology.bolt.wait.progressive.level3.sleep.millis";
+
+
+    /**
+     * A class that implements a wait strategy for an upstream component (spout/bolt) trying to write to a downstream component
+     * whose recv queue is full
+     *
+     * 1. nextTuple emits no tuples
+     * 2. The spout has hit maxSpoutPending and can't emit any more tuples
+     */
+    @isString
+    @isDerivedFrom(baseType = IWaitStrategy.class)
+    public static final String TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY="topology.backpressure.wait.strategy";
+
+    /**
+     * Configures park time if using WaitStrategyPark for BackPressure. If set to 0, returns immediately (i.e busy wait).
+     */
+    @NotNull
+    @isPositiveNumber(includeZero = true)
+    public static final String TOPOLOGY_BACKPRESSURE_WAIT_PARK_MICROSEC = "topology.backpressure.wait.park.microsec";
+
+    /**
+     * Configures sleep time if using WaitStrategyProgressive for BackPressure.
+     */
+    @NotNull
+    @isPositiveNumber(includeZero = true)
+    public static final String TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS = "topology.backpressure.wait.progressive.level3.sleep.millis";
+
+    /**
+     * Configures steps used to determine progression to the next level of wait .. if using WaitStrategyProgressive for BackPressure.
+     */
+    @NotNull
     @isInteger
     @isPositiveNumber
+    public static final String TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT = "topology.backpressure.wait.progressive.level1.count";
+
+    /**
+     * Configures steps used to determine progression to the next level of wait .. if using WaitStrategyProgressive for BackPressure.
+     */
+    @NotNull
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT = "topology.backpressure.wait.progressive.level2.count";
+
+
+    /**
+     * Check recvQ after every N invocations of Spout's nextTuple() [when ACKing is disabled].
+     * Spouts receive very few msgs if ACK is disabled. This avoids checking the recvQ after each nextTuple().
+     */
+    @isInteger
+    @isPositiveNumber(includeZero = true)
     @NotNull
-    public static final String TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS="topology.disruptor.batch.timeout.millis";
+    public static final String TOPOLOGY_SPOUT_RECVQ_SKIPS = "topology.spout.recvq.skips";
 
     /**
      * Minimum number of nimbus hosts where the code must be replicated before leader nimbus
@@ -933,17 +978,6 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC = "topology.max.replication.wait.time.sec";
 
     /**
-     * This is a config that is not likely to be used.  Internally the disruptor queue will batch entries written
-     * into the queue.  A background thread pool will flush those batches if they get too old.  By default that
-     * pool can grow rather large, and sacrifice some CPU time to keep the latency low.  In some cases you may
-     * want the queue to be smaller so there is less CPU used, but the latency will increase in some situations.
-     * This configs is on a per cluster bases, if you want to control this on a per topology bases you need to set
-     * the java System property for the worker "num_flusher_pool_threads" to the value you want.
-     */
-    @isInteger
-    public static final String STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE = "storm.worker.disruptor.flusher.max.pool.size";
-
-    /**
      * The list of servers that Pacemaker is running on.
      */
     @isStringList
@@ -1382,6 +1416,28 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
 
     /**
+     * Netty based messaging: The netty write buffer high watermark in bytes.
+     * <p>
+     * If the number of bytes queued in the netty's write buffer exceeds this value, the netty {@code Channel.isWritable()}
+     * will start to return {@code false}. The client will wait until the value falls below the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK low water mark}.
+     * </p>
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK = "storm.messaging.netty.buffer.high.watermark";
+
+    /**
+     * Netty based messaging: The netty write buffer low watermark in bytes.
+     * <p>
+     * Once the number of bytes queued in the write buffer exceeded the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK high water mark} and then
+     * dropped down below this value, the netty {@code Channel.isWritable()} will start to return true.
+     * </p>
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK = "storm.messaging.netty.buffer.low.watermark";
+
+    /**
      * Netty based messaging: Sets the backlog value to specify when the channel binds to a local address
      */
     @isInteger

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/Constants.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index a8e2c01..623acfb 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -33,6 +33,7 @@ public class Constants {
     public static final List<Long> SYSTEM_EXECUTOR_ID = Arrays.asList(-1L, -1L);
     public static final String SYSTEM_COMPONENT_ID = "__system";
     public static final String SYSTEM_TICK_STREAM_ID = "__tick";
+    public static final String SYSTEM_FLUSH_STREAM_ID = "__flush";
     public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics_";
     public static final String METRICS_STREAM_ID = "__metrics";
     public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
@@ -54,7 +55,6 @@ public class Constants {
     public static final String USER_TIMER = "user-timer";
     public static final String TRANSFER_FN = "transfer-fn";
     public static final String SUICIDE_FN = "suicide-fn";
-    public static final String THROTTLE_ON = "throttle-on";
     public static final String EXECUTOR_RECEIVE_QUEUE_MAP = "executor-receive-queue-map";
     public static final String STORM_ACTIVE_ATOM = "storm-active-atom";
     public static final String COMPONENT_TO_DEBUG_ATOM = "storm-component->debug-atom";