You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/07/04 18:10:54 UTC
[storm] branch master updated: STORM-3450: perf: fix all checkstyle
warnings
This is an automated email from the ASF dual-hosted git repository.
srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 7bf7268 STORM-3450: perf: fix all checkstyle warnings
new 67ae024 Merge pull request #3066 from krichter722/checkstyle-perf
7bf7268 is described below
commit 7bf7268e25a627882ddccc68fdf58df9889011a5
Author: Karl-Philipp Richter <kr...@posteo.de>
AuthorDate: Mon Jul 1 22:18:27 2019 +0200
STORM-3450: perf: fix all checkstyle warnings
---
examples/storm-perf/pom.xml | 2 +-
.../storm/perf/ConstSpoutIdBoltNullBoltTopo.java | 2 +-
.../apache/storm/perf/ConstSpoutNullBoltTopo.java | 13 +-
.../org/apache/storm/perf/ConstSpoutOnlyTopo.java | 11 +-
.../apache/storm/perf/FileReadWordCountTopo.java | 6 +-
.../apache/storm/perf/HdfsSpoutNullBoltTopo.java | 10 +-
.../org/apache/storm/perf/KafkaClientHdfsTopo.java | 16 +-
.../org/apache/storm/perf/LowThroughputTopo.java | 1 -
.../apache/storm/perf/StrGenSpoutHdfsBoltTopo.java | 20 +-
.../org/apache/storm/perf/ThroughputMeter.java | 1 +
.../org/apache/storm/perf/bolt/DevNullBolt.java | 4 +-
.../org/apache/storm/perf/queuetest/Acker.java | 62 ++++++
.../storm/perf/queuetest/AckingProducer.java | 62 ++++++
.../org/apache/storm/perf/queuetest/Consumer.java | 63 ++++++
.../org/apache/storm/perf/queuetest/Forwarder.java | 70 ++++++
.../perf/{ => queuetest}/JCQueuePerfTest.java | 239 +--------------------
.../org/apache/storm/perf/queuetest/MyThread.java | 36 ++++
.../org/apache/storm/perf/queuetest/Producer.java | 44 ++++
.../org/apache/storm/perf/queuetest/Producer2.java | 50 +++++
.../org/apache/storm/perf/spout/WordGenSpout.java | 3 +-
.../java/org/apache/storm/perf/toolstest/Cons.java | 61 ++++++
.../perf/{ => toolstest}/JCToolsPerfTest.java | 108 +---------
.../org/apache/storm/perf/toolstest/MyThd.java | 37 ++++
.../java/org/apache/storm/perf/toolstest/Prod.java | 46 ++++
.../org/apache/storm/perf/toolstest/Prod2.java | 46 ++++
.../java/org/apache/storm/perf/utils/Helper.java | 2 +-
.../org/apache/storm/perf/utils/MetricsSample.java | 4 +-
27 files changed, 627 insertions(+), 392 deletions(-)
diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml
index 527ef49..07c4b17 100644
--- a/examples/storm-perf/pom.xml
+++ b/examples/storm-perf/pom.xml
@@ -92,7 +92,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>65</maxAllowedViolations>
+ <maxAllowedViolations>0</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
index a65efe6..c83763d 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
@@ -30,7 +30,7 @@ import org.apache.storm.utils.Utils;
/**
* ConstSpout -> IdBolt -> DevNullBolt This topology measures speed of messaging between spouts->bolt and bolt->bolt ConstSpout :
- * Continuously emits a constant string IdBolt : clones and emits input tuples DevNullBolt : discards incoming tuples
+ * Continuously emits a constant string IdBolt : clones and emits input tuples DevNullBolt : discards incoming tuples.
*/
public class ConstSpoutIdBoltNullBoltTopo {
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
index 9683e08..63ef51b 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
@@ -28,12 +28,13 @@ import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
-/***
- * This topo helps measure the messaging peak throughput between a spout and a bolt.
- * Spout generates a stream of a fixed string.
- * Bolt will simply ack and discard the tuple received
+/**
+ * This topo helps measure the messaging peak throughput between a spout and a bolt.
+ *
+ * <p>Spout generates a stream of a fixed string.
+ *
+ * <p>Bolt will simply ack and discard the tuple received.
*/
-
public class ConstSpoutNullBoltTopo {
public static final String TOPOLOGY_NAME = "ConstSpoutNullBoltTopo";
@@ -79,7 +80,7 @@ public class ConstSpoutNullBoltTopo {
}
/**
- * ConstSpout -> DevNullBolt with configurable grouping (default localOrShuffle)
+ * ConstSpout -> DevNullBolt with configurable grouping (default localOrShuffle).
*/
public static void main(String[] args) throws Exception {
int runTime = -1;
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
index 5848cbc..1b012fe 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
@@ -25,12 +25,11 @@ import org.apache.storm.perf.utils.Helper;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
-
-/***
- * This topo helps measure how fast a spout can produce data (so no bolts are attached)
- * Spout generates a stream of a fixed string.
+/**
+ * This topo helps measure how fast a spout can produce data (so no bolts are attached).
+ *
+ * <p>Spout generates a stream of a fixed string.
*/
-
public class ConstSpoutOnlyTopo {
public static final String TOPOLOGY_NAME = "ConstSpoutOnlyTopo";
@@ -49,7 +48,7 @@ public class ConstSpoutOnlyTopo {
}
/**
- * ConstSpout only topology (No bolts)
+ * ConstSpout only topology (No bolts).
*/
public static void main(String[] args) throws Exception {
int runTime = -1;
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
index 827337e..78eaab7 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
@@ -29,11 +29,11 @@ import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
-/***
+/**
* This topo helps measure speed of word count.
- * Spout loads a file into memory on initialization, then emits the lines in an endless loop.
+ *
+ * <p>Spout loads a file into memory on initialization, then emits the lines in an endless loop.
*/
-
public class FileReadWordCountTopo {
public static final String SPOUT_ID = "spout";
public static final String COUNT_ID = "counter";
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
index 4f97dfa..9876cac 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
@@ -28,13 +28,13 @@ import org.apache.storm.perf.utils.Helper;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
-/***
+/**
* This topo helps measure speed of reading from Hdfs.
- * Spout Reads from Hdfs.
- * Bolt acks and discards tuples
+ *
+ * <p>Spout Reads from Hdfs.
+ *
+ * <p>Bolt acks and discards tuples.
*/
-
-
public class HdfsSpoutNullBoltTopo {
public static final int DEFAULT_SPOUT_NUM = 1;
public static final int DEFAULT_BOLT_NUM = 1;
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
index e269873..5eb61b2 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
@@ -38,12 +38,13 @@ import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
-/***
+/**
* This topo helps measure speed of reading from Kafka and writing to Hdfs.
- * Spout Reads from Kafka.
- * Bolt writes to Hdfs
+ *
+ * <p>Spout Reads from Kafka.
+ *
+ * <p>Bolt writes to Hdfs.
*/
-
public class KafkaClientHdfsTopo {
// configs - topo parallelism
@@ -123,7 +124,7 @@ public class KafkaClientHdfsTopo {
/**
- * Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming
+ * Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming.
*/
public static void main(String[] args) throws Exception {
@@ -132,7 +133,6 @@ public class KafkaClientHdfsTopo {
return;
}
- Integer durationSec = Integer.parseInt(args[0]);
String confFile = args[1];
Map<String, Object> topoConf = Utils.findAndReadConfigFile(confFile);
topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
@@ -143,6 +143,7 @@ public class KafkaClientHdfsTopo {
topoConf.putAll(Utils.readCommandLineOpts());
// Submit topology to Storm cluster
+ Integer durationSec = Integer.parseInt(args[0]);
Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
@@ -156,9 +157,6 @@ public class KafkaClientHdfsTopo {
/**
* Overrides the default record delimiter.
- *
- * @param delimiter
- * @return
*/
public LineWriter withLineDelimiter(String delimiter) {
this.lineDelimiter = delimiter;
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java
index e4972a4..38e35bc 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/LowThroughputTopo.java
@@ -18,7 +18,6 @@
package org.apache.storm.perf;
-
import java.util.Collections;
import java.util.List;
import java.util.Map;
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
index fa0d90b..0c53db9 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
@@ -36,12 +36,13 @@ import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;
-/***
- * This topo helps measure speed of writing to Hdfs
- * Spout generates fixed length random strings.
- * Bolt writes to Hdfs
+/**
+ * This topo helps measure speed of writing to Hdfs.
+ *
+ * <p>Spout generates fixed length random strings.
+ *
+ * <p>Bolt writes to Hdfs.
*/
-
public class StrGenSpoutHdfsBoltTopo {
// configs - topo parallelism
@@ -71,7 +72,7 @@ public class StrGenSpoutHdfsBoltTopo {
// 2 - Setup HFS Bolt --------
- String Hdfs_url = Helper.getStr(topoConf, HDFS_URI);
+ String hdfsUrl = Helper.getStr(topoConf, HDFS_URI);
RecordFormat format = new LineWriter("str");
SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch);
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB);
@@ -83,7 +84,7 @@ public class StrGenSpoutHdfsBoltTopo {
// Instantiate the HdfsBolt
HdfsBolt bolt = new HdfsBolt()
- .withFsUrl(Hdfs_url)
+ .withFsUrl(hdfsUrl)
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
@@ -102,7 +103,7 @@ public class StrGenSpoutHdfsBoltTopo {
/**
- * Spout generates random strings and HDFS bolt writes them to a text file
+ * Spout generates random strings and HDFS bolt writes them to a text file.
*/
public static void main(String[] args) throws Exception {
String confFile = "conf/HdfsSpoutTopo.yaml";
@@ -144,9 +145,6 @@ public class StrGenSpoutHdfsBoltTopo {
/**
* Overrides the default record delimiter.
- *
- * @param delimiter
- * @return
*/
public LineWriter withLineDelimiter(String delimiter) {
this.lineDelimiter = delimiter;
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java
index bd5d1e1..3d8e736 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ThroughputMeter.java
@@ -31,6 +31,7 @@ public class ThroughputMeter {
}
/**
+ * Calculate throughput.
* @return events/sec
*/
private static double calcThroughput(long count, long startTime, long endTime) {
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
index abb397d..cc181c0 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
@@ -33,7 +33,7 @@ public class DevNullBolt extends BaseRichBolt {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(DevNullBolt.class);
private OutputCollector collector;
private Long sleepNanos;
- private int eCount = 0;
+ private int count = 0;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
@@ -47,7 +47,7 @@ public class DevNullBolt extends BaseRichBolt {
if (sleepNanos > 0) {
LockSupport.parkNanos(sleepNanos);
}
- ++eCount;
+ ++count;
}
@Override
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Acker.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Acker.java
new file mode 100644
index 0000000..e936329
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Acker.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+import org.apache.storm.utils.JCQueue;
+
+/**
+ * Reads from ackerInQ and writes to spout queue.
+ */
+class Acker extends MyThread {
+ private final JCQueue ackerInQ;
+ private final JCQueue spoutInQ;
+
+ public Acker(JCQueue ackerInQ, JCQueue spoutInQ) {
+ super("Acker");
+ this.ackerInQ = ackerInQ;
+ this.spoutInQ = spoutInQ;
+ }
+
+
+ @Override
+ public void run() {
+ long start = System.currentTimeMillis();
+ Handler handler = new Handler();
+ while (!Thread.interrupted()) {
+ ackerInQ.consume(handler);
+ }
+ runTime = System.currentTimeMillis() - start;
+ }
+
+ private class Handler implements JCQueue.Consumer {
+ @Override
+ public void accept(Object event) {
+ try {
+ spoutInQ.publish(event);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void flush() throws InterruptedException {
+ spoutInQ.flush();
+ }
+ }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/AckingProducer.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/AckingProducer.java
new file mode 100644
index 0000000..132d793
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/AckingProducer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+import org.apache.storm.utils.JCQueue;
+
+/**
+ * Writes to two queues.
+ */
+class AckingProducer extends MyThread {
+ private final JCQueue ackerInQ;
+ private final JCQueue spoutInQ;
+
+ public AckingProducer(JCQueue ackerInQ, JCQueue spoutInQ) {
+ super("AckingProducer");
+ this.ackerInQ = ackerInQ;
+ this.spoutInQ = spoutInQ;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Handler handler = new Handler();
+ long start = System.currentTimeMillis();
+ while (!Thread.interrupted()) {
+ int x = spoutInQ.consume(handler);
+ ackerInQ.publish(count);
+ }
+ runTime = System.currentTimeMillis() - start;
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+
+ private class Handler implements JCQueue.Consumer {
+ @Override
+ public void accept(Object event) {
+ // no-op
+ }
+
+ @Override
+ public void flush() {
+ // no-op
+ }
+ }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Consumer.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Consumer.java
new file mode 100644
index 0000000..52ff210
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Consumer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+import java.util.concurrent.locks.LockSupport;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.MutableLong;
+
+class Consumer extends MyThread {
+ public final MutableLong counter = new MutableLong(0);
+ private final JCQueue queue;
+
+ public Consumer(JCQueue queue) {
+ super("Consumer");
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ Handler handler = new Handler();
+ long start = System.currentTimeMillis();
+ while (!Thread.interrupted()) {
+ int x = queue.consume(handler);
+ if (x == 0) {
+ LockSupport.parkNanos(1);
+ }
+ }
+ runTime = System.currentTimeMillis() - start;
+ }
+
+ @Override
+ public long getCount() {
+ return counter.get();
+ }
+
+ private class Handler implements JCQueue.Consumer {
+ @Override
+ public void accept(Object event) {
+ counter.increment();
+ }
+
+ @Override
+ public void flush() {
+ // no-op
+ }
+ }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Forwarder.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Forwarder.java
new file mode 100644
index 0000000..2f951dd
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Forwarder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+import java.util.concurrent.locks.LockSupport;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.MutableLong;
+
+class Forwarder extends MyThread {
+ public final MutableLong counter = new MutableLong(0);
+ private final JCQueue inq;
+ private final JCQueue outq;
+
+ public Forwarder(JCQueue inq, JCQueue outq) {
+ super("Forwarder");
+ this.inq = inq;
+ this.outq = outq;
+ }
+
+ @Override
+ public void run() {
+ Handler handler = new Handler();
+ long start = System.currentTimeMillis();
+ while (!Thread.interrupted()) {
+ int x = inq.consume(handler);
+ if (x == 0) {
+ LockSupport.parkNanos(1);
+ }
+ }
+ runTime = System.currentTimeMillis() - start;
+ }
+
+ @Override
+ public long getCount() {
+ return counter.get();
+ }
+
+ private class Handler implements JCQueue.Consumer {
+ @Override
+ public void accept(Object event) {
+ try {
+ outq.publish(event);
+ counter.increment();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void flush() {
+ // no-op
+ }
+ }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
similarity index 50%
rename from examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java
rename to examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
index 6ba56a9..13d880b 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
@@ -16,7 +16,7 @@
* limitations under the License
*/
-package org.apache.storm.perf;
+package org.apache.storm.perf.queuetest;
import java.util.concurrent.locks.LockSupport;
import org.apache.storm.metrics2.StormMetricRegistry;
@@ -24,6 +24,7 @@ import org.apache.storm.policy.WaitStrategyPark;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.MutableLong;
+@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class JCQueuePerfTest {
// Usage: Let it and then explicitly terminate.
// Metrics will be printed when application is terminated.
@@ -147,240 +148,4 @@ public class JCQueuePerfTest {
}));
}
-
-}
-
-
-abstract class MyThread extends Thread {
- public long count = 0;
- public long runTime = 0;
-
- public MyThread(String thdName) {
- super(thdName);
- }
-
- public long throughput() {
- return getCount() / (runTime / 1000);
- }
-
- public long getCount() {
- return count;
- }
-}
-
-class Producer extends MyThread {
- private final JCQueue q;
-
- public Producer(JCQueue q) {
- super("Producer");
- this.q = q;
- }
-
- @Override
- public void run() {
- try {
- long start = System.currentTimeMillis();
- while (!Thread.interrupted()) {
- q.publish(++count);
- }
- runTime = System.currentTimeMillis() - start;
- } catch (InterruptedException e) {
- return;
- }
- }
-
-}
-
-// writes to two queues
-class Producer2 extends MyThread {
- private final JCQueue q1;
- private final JCQueue q2;
-
- public Producer2(JCQueue q1, JCQueue q2) {
- super("Producer2");
- this.q1 = q1;
- this.q2 = q2;
- }
-
- @Override
- public void run() {
- try {
- long start = System.currentTimeMillis();
- while (!Thread.interrupted()) {
- q1.publish(++count);
- q2.publish(count);
- }
- runTime = System.currentTimeMillis() - start;
- } catch (InterruptedException e) {
- return;
- }
-
- }
}
-
-
-// writes to two queues
-class AckingProducer extends MyThread {
- private final JCQueue ackerInQ;
- private final JCQueue spoutInQ;
-
- public AckingProducer(JCQueue ackerInQ, JCQueue spoutInQ) {
- super("AckingProducer");
- this.ackerInQ = ackerInQ;
- this.spoutInQ = spoutInQ;
- }
-
- @Override
- public void run() {
- try {
- Handler handler = new Handler();
- long start = System.currentTimeMillis();
- while (!Thread.interrupted()) {
- int x = spoutInQ.consume(handler);
- ackerInQ.publish(count);
- }
- runTime = System.currentTimeMillis() - start;
- } catch (InterruptedException e) {
- return;
- }
- }
-
- private class Handler implements JCQueue.Consumer {
- @Override
- public void accept(Object event) {
- // no-op
- }
-
- @Override
- public void flush() {
- // no-op
- }
- }
-}
-
-// reads from ackerInQ and writes to spout queue
-class Acker extends MyThread {
- private final JCQueue ackerInQ;
- private final JCQueue spoutInQ;
-
- public Acker(JCQueue ackerInQ, JCQueue spoutInQ) {
- super("Acker");
- this.ackerInQ = ackerInQ;
- this.spoutInQ = spoutInQ;
- }
-
-
- @Override
- public void run() {
- long start = System.currentTimeMillis();
- Handler handler = new Handler();
- while (!Thread.interrupted()) {
- ackerInQ.consume(handler);
- }
- runTime = System.currentTimeMillis() - start;
- }
-
- private class Handler implements JCQueue.Consumer {
- @Override
- public void accept(Object event) {
- try {
- spoutInQ.publish(event);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void flush() throws InterruptedException {
- spoutInQ.flush();
- }
- }
-}
-
-class Consumer extends MyThread {
- public final MutableLong counter = new MutableLong(0);
- private final JCQueue q;
-
- public Consumer(JCQueue q) {
- super("Consumer");
- this.q = q;
- }
-
- @Override
- public void run() {
- Handler handler = new Handler();
- long start = System.currentTimeMillis();
- while (!Thread.interrupted()) {
- int x = q.consume(handler);
- if (x == 0) {
- LockSupport.parkNanos(1);
- }
- }
- runTime = System.currentTimeMillis() - start;
- }
-
- @Override
- public long getCount() {
- return counter.get();
- }
-
- private class Handler implements JCQueue.Consumer {
- @Override
- public void accept(Object event) {
- counter.increment();
- }
-
- @Override
- public void flush() {
- // no-op
- }
- }
-}
-
-
-class Forwarder extends MyThread {
- public final MutableLong counter = new MutableLong(0);
- private final JCQueue inq;
- private final JCQueue outq;
-
- public Forwarder(JCQueue inq, JCQueue outq) {
- super("Forwarder");
- this.inq = inq;
- this.outq = outq;
- }
-
- @Override
- public void run() {
- Handler handler = new Handler();
- long start = System.currentTimeMillis();
- while (!Thread.interrupted()) {
- int x = inq.consume(handler);
- if (x == 0) {
- LockSupport.parkNanos(1);
- }
- }
- runTime = System.currentTimeMillis() - start;
- }
-
- @Override
- public long getCount() {
- return counter.get();
- }
-
- private class Handler implements JCQueue.Consumer {
- @Override
- public void accept(Object event) {
- try {
- outq.publish(event);
- counter.increment();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void flush() {
- // no-op
- }
- }
-}
\ No newline at end of file
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/MyThread.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/MyThread.java
new file mode 100644
index 0000000..4c84316
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/MyThread.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+abstract class MyThread extends Thread {
+ public long count = 0;
+ public long runTime = 0;
+
+ public MyThread(String thdName) {
+ super(thdName);
+ }
+
+ public long throughput() {
+ return getCount() / (runTime / 1000);
+ }
+
+ public long getCount() {
+ return count;
+ }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer.java
new file mode 100644
index 0000000..eb02d5e
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+import org.apache.storm.utils.JCQueue;
+
+class Producer extends MyThread {
+ private final JCQueue queue;
+
+ public Producer(JCQueue queue) {
+ super("Producer");
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ try {
+ long start = System.currentTimeMillis();
+ while (!Thread.interrupted()) {
+ queue.publish(++count);
+ }
+ runTime = System.currentTimeMillis() - start;
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer2.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer2.java
new file mode 100644
index 0000000..8df519b
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/Producer2.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.queuetest;
+
+import org.apache.storm.utils.JCQueue;
+
+/**
+ * Writes to two queues.
+ */
+class Producer2 extends MyThread {
+ private final JCQueue q1;
+ private final JCQueue q2;
+
+ public Producer2(JCQueue q1, JCQueue q2) {
+ super("Producer2");
+ this.q1 = q1;
+ this.q2 = q2;
+ }
+
+ @Override
+ public void run() {
+ try {
+ long start = System.currentTimeMillis();
+ while (!Thread.interrupted()) {
+ q1.publish(++count);
+ q2.publish(count);
+ }
+ runTime = System.currentTimeMillis() - start;
+ } catch (InterruptedException e) {
+ return;
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java
index f46d6f3..a6a1fc9 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/WordGenSpout.java
@@ -63,8 +63,9 @@ public class WordGenSpout extends BaseRichSpout {
try {
String line;
while ((line = reader.readLine()) != null) {
- for (String word : line.split("\\s+"))
+ for (String word : line.split("\\s+")) {
lines.add(word);
+ }
}
} catch (IOException e) {
throw new RuntimeException("Reading file failed", e);
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Cons.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Cons.java
new file mode 100644
index 0000000..3783310
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Cons.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.toolstest;
+
+import java.util.concurrent.locks.LockSupport;
+import org.apache.storm.utils.MutableLong;
+import org.jctools.queues.MpscArrayQueue;
+
+class Cons extends MyThd {
+ public final MutableLong counter = new MutableLong(0);
+ private final MpscArrayQueue<Object> queue;
+
+ public Cons(MpscArrayQueue<Object> queue) {
+ super("Consumer");
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ Handler handler = new Handler();
+ long start = System.currentTimeMillis();
+
+ while (!halt) {
+ int x = queue.drain(handler);
+ if (x == 0) {
+ LockSupport.parkNanos(1);
+ } else {
+ counter.increment();
+ }
+ }
+ runTime = System.currentTimeMillis() - start;
+ }
+
+ @Override
+ public long getCount() {
+ return counter.get();
+ }
+
+ private class Handler implements org.jctools.queues.MessagePassingQueue.Consumer<Object> {
+ @Override
+ public void accept(Object event) {
+ counter.increment();
+ }
+ }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/JCToolsPerfTest.java
similarity index 60%
rename from examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java
rename to examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/JCToolsPerfTest.java
index a439522..b5afe28 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/JCToolsPerfTest.java
@@ -16,12 +16,11 @@
* limitations under the License
*/
-package org.apache.storm.perf;
+package org.apache.storm.perf.toolstest;
-import java.util.concurrent.locks.LockSupport;
-import org.apache.storm.utils.MutableLong;
import org.jctools.queues.MpscArrayQueue;
+@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class JCToolsPerfTest {
public static void main(String[] args) throws Exception {
// oneProducer1Consumer();
@@ -120,107 +119,4 @@ public class JCToolsPerfTest {
}
-abstract class MyThd extends Thread {
- public long count = 0;
- public long runTime = 0;
- public boolean halt = false;
- public MyThd(String thdName) {
- super(thdName);
- }
-
- public long throughput() {
- return getCount() / (runTime / 1000);
- }
-
- public long getCount() {
- return count;
- }
-}
-
-class Prod extends MyThd {
- private final MpscArrayQueue<Object> q;
-
- public Prod(MpscArrayQueue<Object> q) {
- super("Producer");
- this.q = q;
- }
-
- @Override
- public void run() {
- long start = System.currentTimeMillis();
-
- while (!halt) {
- ++count;
- while (!q.offer(count)) {
- if (Thread.interrupted()) {
- return;
- }
- }
- }
- runTime = System.currentTimeMillis() - start;
- }
-
-}
-
-// writes to two queues
-class Prod2 extends MyThd {
- private final MpscArrayQueue<Object> q1;
- private final MpscArrayQueue<Object> q2;
-
- public Prod2(MpscArrayQueue<Object> q1, MpscArrayQueue<Object> q2) {
- super("Producer2");
- this.q1 = q1;
- this.q2 = q2;
- }
-
- @Override
- public void run() {
- long start = System.currentTimeMillis();
-
- while (!halt) {
- q1.offer(++count);
- q2.offer(count);
- }
- runTime = System.currentTimeMillis() - start;
- }
-}
-
-
-class Cons extends MyThd {
- public final MutableLong counter = new MutableLong(0);
- private final MpscArrayQueue<Object> q;
-
- public Cons(MpscArrayQueue<Object> q) {
- super("Consumer");
- this.q = q;
- }
-
- @Override
- public void run() {
- Handler handler = new Handler();
- long start = System.currentTimeMillis();
-
- while (!halt) {
- int x = q.drain(handler);
- if (x == 0) {
- LockSupport.parkNanos(1);
- } else {
- counter.increment();
- }
- }
- runTime = System.currentTimeMillis() - start;
- }
-
- @Override
- public long getCount() {
- return counter.get();
- }
-
- private class Handler implements org.jctools.queues.MessagePassingQueue.Consumer<Object> {
- @Override
- public void accept(Object event) {
- counter.increment();
- }
- }
-}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/MyThd.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/MyThd.java
new file mode 100644
index 0000000..fbd7aab
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/MyThd.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.toolstest;
+
+abstract class MyThd extends Thread {
+ public long count = 0;
+ public long runTime = 0;
+ public boolean halt = false;
+
+ public MyThd(String thdName) {
+ super(thdName);
+ }
+
+ public long throughput() {
+ return getCount() / (runTime / 1000);
+ }
+
+ public long getCount() {
+ return count;
+ }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod.java
new file mode 100644
index 0000000..3553552
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.toolstest;
+
+import org.jctools.queues.MpscArrayQueue;
+
+class Prod extends MyThd {
+ private final MpscArrayQueue<Object> queue;
+
+ public Prod(MpscArrayQueue<Object> queue) {
+ super("Producer");
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ long start = System.currentTimeMillis();
+
+ while (!halt) {
+ ++count;
+ while (!queue.offer(count)) {
+ if (interrupted()) {
+ return;
+ }
+ }
+ }
+ runTime = System.currentTimeMillis() - start;
+ }
+
+}
\ No newline at end of file
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod2.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod2.java
new file mode 100644
index 0000000..470522a
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/toolstest/Prod2.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.toolstest;
+
+import org.jctools.queues.MpscArrayQueue;
+
+/**
+ * Writes to two queues.
+ */
+class Prod2 extends MyThd {
+ private final MpscArrayQueue<Object> q1;
+ private final MpscArrayQueue<Object> q2;
+
+ public Prod2(MpscArrayQueue<Object> q1, MpscArrayQueue<Object> q2) {
+ super("Producer2");
+ this.q1 = q1;
+ this.q2 = q2;
+ }
+
+ @Override
+ public void run() {
+ long start = System.currentTimeMillis();
+
+ while (!halt) {
+ q1.offer(++count);
+ q2.offer(count);
+ }
+ runTime = System.currentTimeMillis() - start;
+ }
+}
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
index b3a206c..49be70e 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
@@ -68,7 +68,7 @@ public class Helper {
}
/**
- * Kill topo on Ctrl-C
+ * Kill topo on Ctrl-C.
*/
public static void setupShutdownHook(final String topoName) {
Map<String, Object> clusterConf = Utils.readStormConfig();
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
index b70d06a..02e223e 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
@@ -82,8 +82,6 @@ public class MetricsSample {
// number of spout executors
int spoutExecCount = 0;
double spoutLatencySum = 0.0;
-
- long spoutEmitted = 0L;
long spoutTransferred = 0L;
// Executor summaries
@@ -156,6 +154,8 @@ public class MetricsSample {
ret.totalAcked = totalAcked;
ret.totalFailed = totalFailed;
ret.totalLatency = spoutLatencySum / spoutExecCount;
+
+ long spoutEmitted = 0L;
ret.spoutEmitted = spoutEmitted;
ret.spoutTransferred = spoutTransferred;
ret.sampleTime = System.currentTimeMillis();