You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/07/04 18:10:54 UTC

[storm] branch master updated: STORM-3450: perf: fix all checkstyle warnings

This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bf7268  STORM-3450: perf: fix all checkstyle warnings
     new 67ae024  Merge pull request #3066 from krichter722/checkstyle-perf
7bf7268 is described below

commit 7bf7268e25a627882ddccc68fdf58df9889011a5
Author: Karl-Philipp Richter <kr...@posteo.de>
AuthorDate: Mon Jul 1 22:18:27 2019 +0200

    STORM-3450: perf: fix all checkstyle warnings
---
 examples/storm-perf/pom.xml                        |   2 +-
 .../storm/perf/ConstSpoutIdBoltNullBoltTopo.java   |   2 +-
 .../apache/storm/perf/ConstSpoutNullBoltTopo.java  |  13 +-
 .../org/apache/storm/perf/ConstSpoutOnlyTopo.java  |  11 +-
 .../apache/storm/perf/FileReadWordCountTopo.java   |   6 +-
 .../apache/storm/perf/HdfsSpoutNullBoltTopo.java   |  10 +-
 .../org/apache/storm/perf/KafkaClientHdfsTopo.java |  16 +-
 .../org/apache/storm/perf/LowThroughputTopo.java   |   1 -
 .../apache/storm/perf/StrGenSpoutHdfsBoltTopo.java |  20 +-
 .../org/apache/storm/perf/ThroughputMeter.java     |   1 +
 .../org/apache/storm/perf/bolt/DevNullBolt.java    |   4 +-
 .../org/apache/storm/perf/queuetest/Acker.java     |  62 ++++++
 .../storm/perf/queuetest/AckingProducer.java       |  62 ++++++
 .../org/apache/storm/perf/queuetest/Consumer.java  |  63 ++++++
 .../org/apache/storm/perf/queuetest/Forwarder.java |  70 ++++++
 .../perf/{ => queuetest}/JCQueuePerfTest.java      | 239 +--------------------
 .../org/apache/storm/perf/queuetest/MyThread.java  |  36 ++++
 .../org/apache/storm/perf/queuetest/Producer.java  |  44 ++++
 .../org/apache/storm/perf/queuetest/Producer2.java |  50 +++++
 .../org/apache/storm/perf/spout/WordGenSpout.java  |   3 +-
 .../java/org/apache/storm/perf/toolstest/Cons.java |  61 ++++++
 .../perf/{ => toolstest}/JCToolsPerfTest.java      | 108 +---------
 .../org/apache/storm/perf/toolstest/MyThd.java     |  37 ++++
 .../java/org/apache/storm/perf/toolstest/Prod.java |  46 ++++
 .../org/apache/storm/perf/toolstest/Prod2.java     |  46 ++++
 .../java/org/apache/storm/perf/utils/Helper.java   |   2 +-
 .../org/apache/storm/perf/utils/MetricsSample.java |   4 +-
 27 files changed, 627 insertions(+), 392 deletions(-)

diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml
index 527ef49..07c4b17 100644
--- a/examples/storm-perf/pom.xml
+++ b/examples/storm-perf/pom.xml
@@ -92,7 +92,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>65</maxAllowedViolations>
+                    <maxAllowedViolations>0</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
index a65efe6..c83763d 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
@@ -30,7 +30,7 @@ import org.apache.storm.utils.Utils;
 
 /**
  * ConstSpout -> IdBolt -> DevNullBolt This topology measures speed of messaging between spouts->bolt  and  bolt->bolt ConstSpout :
- * Continuously emits a constant string IdBolt : clones and emits input tuples DevNullBolt : discards incoming tuples
+ * Continuously emits a constant string IdBolt : clones and emits input tuples DevNullBolt : discards incoming tuples.
  */
 public class ConstSpoutIdBoltNullBoltTopo {
 
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
index 9683e08..63ef51b 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
@@ -28,12 +28,13 @@ import org.apache.storm.topology.BoltDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 
-/***
- *  This topo helps measure the messaging peak throughput between a spout and a bolt.
- *  Spout generates a stream of a fixed string.
- *  Bolt will simply ack and discard the tuple received
+/**
+ * This topo helps measure the messaging peak throughput between a spout and a bolt.
+ *
+ * <p>Spout generates a stream of a fixed string.
+ *
+ * <p>Bolt will simply ack and discard the tuple received.
  */
-
 public class ConstSpoutNullBoltTopo {
 
     public static final String TOPOLOGY_NAME = "ConstSpoutNullBoltTopo";
@@ -79,7 +80,7 @@ public class ConstSpoutNullBoltTopo {
     }
 
     /**
-     * ConstSpout -> DevNullBolt with configurable grouping (default localOrShuffle)
+     * ConstSpout -> DevNullBolt with configurable grouping (default localOrShuffle).
      */
     public static void main(String[] args) throws Exception {
         int runTime = -1;
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
index 5848cbc..1b012fe 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
@@ -25,12 +25,11 @@ import org.apache.storm.perf.utils.Helper;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 
-
-/***
- * This topo helps measure how fast a spout can produce data (so no bolts are attached)
- *  Spout generates a stream of a fixed string.
+/**
+ * This topo helps measure how fast a spout can produce data (so no bolts are attached).
+ *
+ * <p>Spout generates a stream of a fixed string.
  */
-
 public class ConstSpoutOnlyTopo {
 
     public static final String TOPOLOGY_NAME = "ConstSpoutOnlyTopo";
@@ -49,7 +48,7 @@ public class ConstSpoutOnlyTopo {
     }
 
     /**
-     * ConstSpout only topology  (No bolts)
+     * ConstSpout only topology  (No bolts).
      */
     public static void main(String[] args) throws Exception {
         int runTime = -1;
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
index 827337e..78eaab7 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
@@ -29,11 +29,11 @@ import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
 
-/***
+/**
  * This topo helps measure speed of word count.
- *  Spout loads a file into memory on initialization, then emits the lines in an endless loop.
+ *
+ * <p>Spout loads a file into memory on initialization, then emits the lines in an endless loop.
  */
-
 public class FileReadWordCountTopo {
     public static final String SPOUT_ID = "spout";
     public static final String COUNT_ID = "counter";
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
index 4f97dfa..9876cac 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
@@ -28,13 +28,13 @@ import org.apache.storm.perf.utils.Helper;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 
-/***
+/**
  * This topo helps measure speed of reading from Hdfs.
- *  Spout Reads from Hdfs.
- *  Bolt acks and discards tuples
+ *
+ * <p>Spout Reads from Hdfs.
+ *
+ * <p>Bolt acks and discards tuples.
  */
-
-
 public class HdfsSpoutNullBoltTopo {
     public static final int DEFAULT_SPOUT_NUM = 1;
     public static final int DEFAULT_BOLT_NUM = 1;
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
index e269873..5eb61b2 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
@@ -38,12 +38,13 @@ import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Utils;
 
-/***
+/**
  * This topo helps measure speed of reading from Kafka and writing to Hdfs.
- *  Spout Reads from Kafka.
- *  Bolt writes to Hdfs
+ *
+ * <p>Spout Reads from Kafka.
+ *
+ * <p>Bolt writes to Hdfs.
  */
-
 public class KafkaClientHdfsTopo {
 
     // configs - topo parallelism
@@ -123,7 +124,7 @@ public class KafkaClientHdfsTopo {
 
 
     /**
-     * Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming
+     * Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming.
      */
     public static void main(String[] args) throws Exception {
 
@@ -132,7 +133,6 @@ public class KafkaClientHdfsTopo {
             return;
         }
 
-        Integer durationSec = Integer.parseInt(args[0]);
         String confFile = args[1];
         Map<String, Object> topoConf = Utils.findAndReadConfigFile(confFile);
         topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
@@ -143,6 +143,7 @@ public class KafkaClientHdfsTopo {
 
         topoConf.putAll(Utils.readCommandLineOpts());
         //  Submit topology to Storm cluster
+        Integer durationSec = Integer.parseInt(args[0]);
         Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
     }
 
@@ -156,9 +157,6 @@ public class KafkaClientHdfsTopo {
 
         /**
          * Overrides the default record delimiter.
-         *
-         * @param delimiter
-         * @return
          */
         public LineWriter withLineDelimiter(String delimiter) {
             this.lineDelimiter = delimiter;
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java
index e4972a4..38e35bc 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java
@@ -18,7 +18,6 @@
 
 package org.apache.storm.perf;
 
-
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
index fa0d90b..0c53db9 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
@@ -36,12 +36,13 @@ import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.Utils;
 
-/***
- * This topo helps measure speed of writing to Hdfs
- *  Spout generates fixed length random strings.
- *  Bolt writes to Hdfs
+/**
+ * This topo helps measure speed of writing to Hdfs.
+ *
+ * <p>Spout generates fixed length random strings.
+ *
+ * <p>Bolt writes to Hdfs.
  */
-
 public class StrGenSpoutHdfsBoltTopo {
 
     // configs - topo parallelism
@@ -71,7 +72,7 @@ public class StrGenSpoutHdfsBoltTopo {
 
 
         // 2 -  Setup HFS Bolt   --------
-        String Hdfs_url = Helper.getStr(topoConf, HDFS_URI);
+        String hdfsUrl = Helper.getStr(topoConf, HDFS_URI);
         RecordFormat format = new LineWriter("str");
         SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch);
         FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB);
@@ -83,7 +84,7 @@ public class StrGenSpoutHdfsBoltTopo {
 
         // Instantiate the HdfsBolt
         HdfsBolt bolt = new HdfsBolt()
-            .withFsUrl(Hdfs_url)
+            .withFsUrl(hdfsUrl)
             .withFileNameFormat(fileNameFormat)
             .withRecordFormat(format)
             .withRotationPolicy(rotationPolicy)
@@ -102,7 +103,7 @@ public class StrGenSpoutHdfsBoltTopo {
 
 
     /**
-     * Spout generates random strings and HDFS bolt writes them to a text file
+     * Spout generates random strings and HDFS bolt writes them to a text file.
      */
     public static void main(String[] args) throws Exception {
         String confFile = "conf/HdfsSpoutTopo.yaml";
@@ -144,9 +145,6 @@ public class StrGenSpoutHdfsBoltTopo {
 
         /**
          * Overrides the default record delimiter.
-         *
-         * @param delimiter
-         * @return
          */
         public LineWriter withLineDelimiter(String delimiter) {
             this.lineDelimiter = delimiter;
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java
index bd5d1e1..3d8e736 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java
@@ -31,6 +31,7 @@ public class ThroughputMeter {
     }
 
     /**
+     * Calculate throughput.
      * @return events/sec
      */
     private static double calcThroughput(long count, long startTime, long endTime) {
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
index abb397d..cc181c0 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
@@ -33,7 +33,7 @@ public class DevNullBolt extends BaseRichBolt {
     private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(DevNullBolt.class);
     private OutputCollector collector;
     private Long sleepNanos;
-    private int eCount = 0;
+    private int count = 0;
 
     @Override
     public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
@@ -47,7 +47,7 @@ public class DevNullBolt extends BaseRichBolt {
         if (sleepNanos > 0) {
             LockSupport.parkNanos(sleepNanos);
         }
-        ++eCount;
+        ++count;
     }
 
     @Override
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Acker.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Acker.java
new file mode 100644
index 0000000..e936329
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Acker.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+import org.apache.storm.utils.JCQueue;
+
+/**
+ * Reads from ackerInQ and writes to spout queue.
+ */
+class Acker extends MyThread {
+    private final JCQueue ackerInQ;
+    private final JCQueue spoutInQ;
+
+    public Acker(JCQueue ackerInQ, JCQueue spoutInQ) {
+        super("Acker");
+        this.ackerInQ = ackerInQ;
+        this.spoutInQ = spoutInQ;
+    }
+
+
+    @Override
+    public void run() {
+        long start = System.currentTimeMillis();
+        Handler handler = new Handler();
+        while (!Thread.interrupted()) {
+            ackerInQ.consume(handler);
+        }
+        runTime = System.currentTimeMillis() - start;
+    }
+
+    private class Handler implements JCQueue.Consumer {
+        @Override
+        public void accept(Object event) {
+            try {
+                spoutInQ.publish(event);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void flush() throws InterruptedException {
+            spoutInQ.flush();
+        }
+    }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/AckingProducer.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/AckingProducer.java
new file mode 100644
index 0000000..132d793
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/AckingProducer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+import org.apache.storm.utils.JCQueue;
+
+/**
+ * Writes to two queues.
+ */
+class AckingProducer extends MyThread {
+    private final JCQueue ackerInQ;
+    private final JCQueue spoutInQ;
+
+    public AckingProducer(JCQueue ackerInQ, JCQueue spoutInQ) {
+        super("AckingProducer");
+        this.ackerInQ = ackerInQ;
+        this.spoutInQ = spoutInQ;
+    }
+
+    @Override
+    public void run() {
+        try {
+            Handler handler = new Handler();
+            long start = System.currentTimeMillis();
+            while (!Thread.interrupted()) {
+                int x = spoutInQ.consume(handler);
+                ackerInQ.publish(count);
+            }
+            runTime = System.currentTimeMillis() - start;
+        } catch (InterruptedException e) {
+            return;
+        }
+    }
+
+    private class Handler implements JCQueue.Consumer {
+        @Override
+        public void accept(Object event) {
+            // no-op
+        }
+
+        @Override
+        public void flush() {
+            // no-op
+        }
+    }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Consumer.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Consumer.java
new file mode 100644
index 0000000..52ff210
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Consumer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+import java.util.concurrent.locks.LockSupport;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.MutableLong;
+
+class Consumer extends MyThread {
+    public final MutableLong counter = new MutableLong(0);
+    private final JCQueue queue;
+
+    public Consumer(JCQueue queue) {
+        super("Consumer");
+        this.queue = queue;
+    }
+
+    @Override
+    public void run() {
+        Handler handler = new Handler();
+        long start = System.currentTimeMillis();
+        while (!Thread.interrupted()) {
+            int x = queue.consume(handler);
+            if (x == 0) {
+                LockSupport.parkNanos(1);
+            }
+        }
+        runTime = System.currentTimeMillis() - start;
+    }
+
+    @Override
+    public long getCount() {
+        return counter.get();
+    }
+
+    private class Handler implements JCQueue.Consumer {
+        @Override
+        public void accept(Object event) {
+            counter.increment();
+        }
+
+        @Override
+        public void flush() {
+            // no-op
+        }
+    }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Forwarder.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Forwarder.java
new file mode 100644
index 0000000..2f951dd
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Forwarder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+import java.util.concurrent.locks.LockSupport;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.MutableLong;
+
+class Forwarder extends MyThread {
+    public final MutableLong counter = new MutableLong(0);
+    private final JCQueue inq;
+    private final JCQueue outq;
+
+    public Forwarder(JCQueue inq, JCQueue outq) {
+        super("Forwarder");
+        this.inq = inq;
+        this.outq = outq;
+    }
+
+    @Override
+    public void run() {
+        Handler handler = new Handler();
+        long start = System.currentTimeMillis();
+        while (!Thread.interrupted()) {
+            int x = inq.consume(handler);
+            if (x == 0) {
+                LockSupport.parkNanos(1);
+            }
+        }
+        runTime = System.currentTimeMillis() - start;
+    }
+
+    @Override
+    public long getCount() {
+        return counter.get();
+    }
+
+    private class Handler implements JCQueue.Consumer {
+        @Override
+        public void accept(Object event) {
+            try {
+                outq.publish(event);
+                counter.increment();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void flush() {
+            // no-op
+        }
+    }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
similarity index 50%
rename from examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java
rename to examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
index 6ba56a9..13d880b 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
@@ -16,7 +16,7 @@
  * limitations under the License
  */
 
-package org.apache.storm.perf;
+package org.apache.storm.perf.queuetest;
 
 import java.util.concurrent.locks.LockSupport;
 import org.apache.storm.metrics2.StormMetricRegistry;
@@ -24,6 +24,7 @@ import org.apache.storm.policy.WaitStrategyPark;
 import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.MutableLong;
 
+@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
 public class JCQueuePerfTest {
     // Usage: Let it and then explicitly terminate.
     // Metrics will be printed when application is terminated.
@@ -147,240 +148,4 @@ public class JCQueuePerfTest {
         }));
 
     }
-
-}
-
-
-abstract class MyThread extends Thread {
-    public long count = 0;
-    public long runTime = 0;
-
-    public MyThread(String thdName) {
-        super(thdName);
-    }
-
-    public long throughput() {
-        return getCount() / (runTime / 1000);
-    }
-
-    public long getCount() {
-        return count;
-    }
-}
-
-class Producer extends MyThread {
-    private final JCQueue q;
-
-    public Producer(JCQueue q) {
-        super("Producer");
-        this.q = q;
-    }
-
-    @Override
-    public void run() {
-        try {
-            long start = System.currentTimeMillis();
-            while (!Thread.interrupted()) {
-                q.publish(++count);
-            }
-            runTime = System.currentTimeMillis() - start;
-        } catch (InterruptedException e) {
-            return;
-        }
-    }
-
-}
-
-// writes to two queues
-class Producer2 extends MyThread {
-    private final JCQueue q1;
-    private final JCQueue q2;
-
-    public Producer2(JCQueue q1, JCQueue q2) {
-        super("Producer2");
-        this.q1 = q1;
-        this.q2 = q2;
-    }
-
-    @Override
-    public void run() {
-        try {
-            long start = System.currentTimeMillis();
-            while (!Thread.interrupted()) {
-                q1.publish(++count);
-                q2.publish(count);
-            }
-            runTime = System.currentTimeMillis() - start;
-        } catch (InterruptedException e) {
-            return;
-        }
-
-    }
 }
-
-
-// writes to two queues
-class AckingProducer extends MyThread {
-    private final JCQueue ackerInQ;
-    private final JCQueue spoutInQ;
-
-    public AckingProducer(JCQueue ackerInQ, JCQueue spoutInQ) {
-        super("AckingProducer");
-        this.ackerInQ = ackerInQ;
-        this.spoutInQ = spoutInQ;
-    }
-
-    @Override
-    public void run() {
-        try {
-            Handler handler = new Handler();
-            long start = System.currentTimeMillis();
-            while (!Thread.interrupted()) {
-                int x = spoutInQ.consume(handler);
-                ackerInQ.publish(count);
-            }
-            runTime = System.currentTimeMillis() - start;
-        } catch (InterruptedException e) {
-            return;
-        }
-    }
-
-    private class Handler implements JCQueue.Consumer {
-        @Override
-        public void accept(Object event) {
-            // no-op
-        }
-
-        @Override
-        public void flush() {
-            // no-op
-        }
-    }
-}
-
-// reads from ackerInQ and writes to spout queue
-class Acker extends MyThread {
-    private final JCQueue ackerInQ;
-    private final JCQueue spoutInQ;
-
-    public Acker(JCQueue ackerInQ, JCQueue spoutInQ) {
-        super("Acker");
-        this.ackerInQ = ackerInQ;
-        this.spoutInQ = spoutInQ;
-    }
-
-
-    @Override
-    public void run() {
-        long start = System.currentTimeMillis();
-        Handler handler = new Handler();
-        while (!Thread.interrupted()) {
-            ackerInQ.consume(handler);
-        }
-        runTime = System.currentTimeMillis() - start;
-    }
-
-    private class Handler implements JCQueue.Consumer {
-        @Override
-        public void accept(Object event) {
-            try {
-                spoutInQ.publish(event);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public void flush() throws InterruptedException {
-            spoutInQ.flush();
-        }
-    }
-}
-
-class Consumer extends MyThread {
-    public final MutableLong counter = new MutableLong(0);
-    private final JCQueue q;
-
-    public Consumer(JCQueue q) {
-        super("Consumer");
-        this.q = q;
-    }
-
-    @Override
-    public void run() {
-        Handler handler = new Handler();
-        long start = System.currentTimeMillis();
-        while (!Thread.interrupted()) {
-            int x = q.consume(handler);
-            if (x == 0) {
-                LockSupport.parkNanos(1);
-            }
-        }
-        runTime = System.currentTimeMillis() - start;
-    }
-
-    @Override
-    public long getCount() {
-        return counter.get();
-    }
-
-    private class Handler implements JCQueue.Consumer {
-        @Override
-        public void accept(Object event) {
-            counter.increment();
-        }
-
-        @Override
-        public void flush() {
-            // no-op
-        }
-    }
-}
-
-
-class Forwarder extends MyThread {
-    public final MutableLong counter = new MutableLong(0);
-    private final JCQueue inq;
-    private final JCQueue outq;
-
-    public Forwarder(JCQueue inq, JCQueue outq) {
-        super("Forwarder");
-        this.inq = inq;
-        this.outq = outq;
-    }
-
-    @Override
-    public void run() {
-        Handler handler = new Handler();
-        long start = System.currentTimeMillis();
-        while (!Thread.interrupted()) {
-            int x = inq.consume(handler);
-            if (x == 0) {
-                LockSupport.parkNanos(1);
-            }
-        }
-        runTime = System.currentTimeMillis() - start;
-    }
-
-    @Override
-    public long getCount() {
-        return counter.get();
-    }
-
-    private class Handler implements JCQueue.Consumer {
-        @Override
-        public void accept(Object event) {
-            try {
-                outq.publish(event);
-                counter.increment();
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public void flush() {
-            // no-op
-        }
-    }
-}
\ No newline at end of file
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/MyThread.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/MyThread.java
new file mode 100644
index 0000000..4c84316
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/MyThread.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+abstract class MyThread extends Thread {
+    public long count = 0;
+    public long runTime = 0;
+
+    public MyThread(String thdName) {
+        super(thdName);
+    }
+
+    public long throughput() {
+        return getCount() / (runTime / 1000);
+    }
+
+    public long getCount() {
+        return count;
+    }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer.java
new file mode 100644
index 0000000..eb02d5e
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+import org.apache.storm.utils.JCQueue;
+
+class Producer extends MyThread {
+    private final JCQueue queue;
+
+    public Producer(JCQueue queue) {
+        super("Producer");
+        this.queue = queue;
+    }
+
+    @Override
+    public void run() {
+        try {
+            long start = System.currentTimeMillis();
+            while (!Thread.interrupted()) {
+                queue.publish(++count);
+            }
+            runTime = System.currentTimeMillis() - start;
+        } catch (InterruptedException e) {
+            return;
+        }
+    }
+
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer2.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer2.java
new file mode 100644
index 0000000..8df519b
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer2.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+import org.apache.storm.utils.JCQueue;
+
+/**
+ * Writes to two queues.
+ */
+class Producer2 extends MyThread {
+    private final JCQueue q1;
+    private final JCQueue q2;
+
+    public Producer2(JCQueue q1, JCQueue q2) {
+        super("Producer2");
+        this.q1 = q1;
+        this.q2 = q2;
+    }
+
+    @Override
+    public void run() {
+        try {
+            long start = System.currentTimeMillis();
+            while (!Thread.interrupted()) {
+                q1.publish(++count);
+                q2.publish(count);
+            }
+            runTime = System.currentTimeMillis() - start;
+        } catch (InterruptedException e) {
+            return;
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java
index f46d6f3..a6a1fc9 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java
@@ -63,8 +63,9 @@ public class WordGenSpout extends BaseRichSpout {
             try {
                 String line;
                 while ((line = reader.readLine()) != null) {
-                    for (String word : line.split("\\s+"))
+                    for (String word : line.split("\\s+")) {
                         lines.add(word);
+                    }
                 }
             } catch (IOException e) {
                 throw new RuntimeException("Reading file failed", e);
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Cons.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Cons.java
new file mode 100644
index 0000000..3783310
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Cons.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.toolstest;
+
+import java.util.concurrent.locks.LockSupport;
+import org.apache.storm.utils.MutableLong;
+import org.jctools.queues.MpscArrayQueue;
+
+class Cons extends MyThd {
+    public final MutableLong counter = new MutableLong(0);
+    private final MpscArrayQueue<Object> queue;
+
+    public Cons(MpscArrayQueue<Object> queue) {
+        super("Consumer");
+        this.queue = queue;
+    }
+
+    @Override
+    public void run() {
+        Handler handler = new Handler();
+        long start = System.currentTimeMillis();
+
+        while (!halt) {
+            int x = queue.drain(handler);
+            if (x == 0) {
+                LockSupport.parkNanos(1);
+            } else {
+                counter.increment();
+            }
+        }
+        runTime = System.currentTimeMillis() - start;
+    }
+
+    @Override
+    public long getCount() {
+        return counter.get();
+    }
+
+    private class Handler implements org.jctools.queues.MessagePassingQueue.Consumer<Object> {
+        @Override
+        public void accept(Object event) {
+            counter.increment();
+        }
+    }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/JCToolsPerfTest.java
similarity index 60%
rename from examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java
rename to examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/JCToolsPerfTest.java
index a439522..b5afe28 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/JCToolsPerfTest.java
@@ -16,12 +16,11 @@
  * limitations under the License
  */
 
-package org.apache.storm.perf;
+package org.apache.storm.perf.toolstest;
 
-import java.util.concurrent.locks.LockSupport;
-import org.apache.storm.utils.MutableLong;
 import org.jctools.queues.MpscArrayQueue;
 
+@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
 public class JCToolsPerfTest {
     public static void main(String[] args) throws Exception {
         //        oneProducer1Consumer();
@@ -120,107 +119,4 @@ public class JCToolsPerfTest {
 }
 
 
-abstract class MyThd extends Thread {
-    public long count = 0;
-    public long runTime = 0;
-    public boolean halt = false;
 
-    public MyThd(String thdName) {
-        super(thdName);
-    }
-
-    public long throughput() {
-        return getCount() / (runTime / 1000);
-    }
-
-    public long getCount() {
-        return count;
-    }
-}
-
-class Prod extends MyThd {
-    private final MpscArrayQueue<Object> q;
-
-    public Prod(MpscArrayQueue<Object> q) {
-        super("Producer");
-        this.q = q;
-    }
-
-    @Override
-    public void run() {
-        long start = System.currentTimeMillis();
-
-        while (!halt) {
-            ++count;
-            while (!q.offer(count)) {
-                if (Thread.interrupted()) {
-                    return;
-                }
-            }
-        }
-        runTime = System.currentTimeMillis() - start;
-    }
-
-}
-
-// writes to two queues
-class Prod2 extends MyThd {
-    private final MpscArrayQueue<Object> q1;
-    private final MpscArrayQueue<Object> q2;
-
-    public Prod2(MpscArrayQueue<Object> q1, MpscArrayQueue<Object> q2) {
-        super("Producer2");
-        this.q1 = q1;
-        this.q2 = q2;
-    }
-
-    @Override
-    public void run() {
-        long start = System.currentTimeMillis();
-
-        while (!halt) {
-            q1.offer(++count);
-            q2.offer(count);
-        }
-        runTime = System.currentTimeMillis() - start;
-    }
-}
-
-
-class Cons extends MyThd {
-    public final MutableLong counter = new MutableLong(0);
-    private final MpscArrayQueue<Object> q;
-
-    public Cons(MpscArrayQueue<Object> q) {
-        super("Consumer");
-        this.q = q;
-    }
-
-    @Override
-    public void run() {
-        Handler handler = new Handler();
-        long start = System.currentTimeMillis();
-
-        while (!halt) {
-            int x = q.drain(handler);
-            if (x == 0) {
-                LockSupport.parkNanos(1);
-            } else {
-                counter.increment();
-            }
-        }
-        runTime = System.currentTimeMillis() - start;
-    }
-
-    @Override
-    public long getCount() {
-        return counter.get();
-    }
-
-    private class Handler implements org.jctools.queues.MessagePassingQueue.Consumer<Object> {
-        @Override
-        public void accept(Object event) {
-            counter.increment();
-        }
-    }
-}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/MyThd.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/MyThd.java
new file mode 100644
index 0000000..fbd7aab
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/MyThd.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.toolstest;
+
+abstract class MyThd extends Thread {
+    public long count = 0;
+    public long runTime = 0;
+    public boolean halt = false;
+
+    public MyThd(String thdName) {
+        super(thdName);
+    }
+
+    public long throughput() {
+        return getCount() / (runTime / 1000);
+    }
+
+    public long getCount() {
+        return count;
+    }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod.java
new file mode 100644
index 0000000..3553552
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.toolstest;
+
+import org.jctools.queues.MpscArrayQueue;
+
+class Prod extends MyThd {
+    private final MpscArrayQueue<Object> queue;
+
+    public Prod(MpscArrayQueue<Object> queue) {
+        super("Producer");
+        this.queue = queue;
+    }
+
+    @Override
+    public void run() {
+        long start = System.currentTimeMillis();
+
+        while (!halt) {
+            ++count;
+            while (!queue.offer(count)) {
+                if (interrupted()) {
+                    return;
+                }
+            }
+        }
+        runTime = System.currentTimeMillis() - start;
+    }
+
+}
\ No newline at end of file
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod2.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod2.java
new file mode 100644
index 0000000..470522a
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod2.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.toolstest;
+
+import org.jctools.queues.MpscArrayQueue;
+
+/**
+ * Writes to two queues.
+ */
+class Prod2 extends MyThd {
+    private final MpscArrayQueue<Object> q1;
+    private final MpscArrayQueue<Object> q2;
+
+    public Prod2(MpscArrayQueue<Object> q1, MpscArrayQueue<Object> q2) {
+        super("Producer2");
+        this.q1 = q1;
+        this.q2 = q2;
+    }
+
+    @Override
+    public void run() {
+        long start = System.currentTimeMillis();
+
+        while (!halt) {
+            q1.offer(++count);
+            q2.offer(count);
+        }
+        runTime = System.currentTimeMillis() - start;
+    }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
index b3a206c..49be70e 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
@@ -68,7 +68,7 @@ public class Helper {
     }
 
     /**
-     * Kill topo on Ctrl-C
+     * Kill topo on Ctrl-C.
      */
     public static void setupShutdownHook(final String topoName) {
         Map<String, Object> clusterConf = Utils.readStormConfig();
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
index b70d06a..02e223e 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
@@ -82,8 +82,6 @@ public class MetricsSample {
         // number of spout executors
         int spoutExecCount = 0;
         double spoutLatencySum = 0.0;
-
-        long spoutEmitted = 0L;
         long spoutTransferred = 0L;
 
         // Executor summaries
@@ -156,6 +154,8 @@ public class MetricsSample {
         ret.totalAcked = totalAcked;
         ret.totalFailed = totalFailed;
         ret.totalLatency = spoutLatencySum / spoutExecCount;
+
+        long spoutEmitted = 0L;
         ret.spoutEmitted = spoutEmitted;
         ret.spoutTransferred = spoutTransferred;
         ret.sampleTime = System.currentTimeMillis();