You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/06 07:27:45 UTC
[1/2] storm git commit: STORM-2933: Add a storm-perf topology that
uses storm-kafka-client
Repository: storm
Updated Branches:
refs/heads/master b5d70e17d -> 12cc49fcb
STORM-2933: Add a storm-perf topology that uses storm-kafka-client
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/71496cb6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/71496cb6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/71496cb6
Branch: refs/heads/master
Commit: 71496cb67ee354d1ee7303094891dd503dca97c6
Parents: 7fbe7a2
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Feb 3 11:04:54 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sat Feb 3 18:18:27 2018 +0100
----------------------------------------------------------------------
examples/storm-perf/README.markdown | 15 +--
examples/storm-perf/pom.xml | 10 ++
.../main/conf/KafkaClientSpoutNullBoltTopo.yml | 20 ++++
.../perf/KafkaClientSpoutNullBoltTopo.java | 113 +++++++++++++++++++
.../apache/storm/perf/utils/MetricsSample.java | 18 +--
5 files changed, 161 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/71496cb6/examples/storm-perf/README.markdown
----------------------------------------------------------------------
diff --git a/examples/storm-perf/README.markdown b/examples/storm-perf/README.markdown
index 946ab21..8dea58b 100644
--- a/examples/storm-perf/README.markdown
+++ b/examples/storm-perf/README.markdown
@@ -20,14 +20,15 @@ Topologies that measure I/O with external systems are designed to run in cluster
## Topologies List
-1. **ConstSpoutOnlyTopo:** Helps measure how fast spout can emit. This topology has a spout and is not connected to any bolts. Supports in-proc and cluster mode.
-2. **ConstSpoutNullBoltTopo:** Helps measure how fast spout can send data to a bolt. Spout emits a stream of constant values to a DevNull bolt which discards the incoming tuples. Supports in-proc and cluster mode.
-3. **ConstSpoutIdBoltNullBoltTopo:** Helps measure speed of messaging between spouts and bolts. Spout emits a stream of constant values to an ID bolt which clones the tuple and emits it downstream to a DevNull bolt. Supports in-proc and cluster mode.
-4. **FileReadWordCount:** Measures speed of word counting. The spout loads a file into memory and emits these lines in an infinite loop. Supports in-proc and cluster mode.
-5. **HdfsSpoutNullBolt:** Measures speed at which HdfsSpout can read from HDFS. Supports cluster mode only.
+1. **ConstSpoutOnlyTopo:** Helps measure how fast spout can emit. This topology has a spout and is not connected to any bolts. Supports cluster mode only.
+2. **ConstSpoutNullBoltTopo:** Helps measure how fast spout can send data to a bolt. Spout emits a stream of constant values to a DevNull bolt which discards the incoming tuples. Supports cluster mode only.
+3. **ConstSpoutIdBoltNullBoltTopo:** Helps measure speed of messaging between spouts and bolts. Spout emits a stream of constant values to an ID bolt which clones the tuple and emits it downstream to a DevNull bolt. Supports cluster mode only.
+4. **FileReadWordCountTopo:** Measures speed of word counting. The spout loads a file into memory and emits these lines in an infinite loop. Supports cluster mode only.
+5. **HdfsSpoutNullBoltTopo:** Measures speed at which HdfsSpout can read from HDFS. Supports cluster mode only.
6. **StrGenSpoutHdfsBoltTopo:** Measures speed at which HdfsBolt can write to HDFS. Supports cluster mode only.
-7. **KafkaSpoutNullBolt:** Measures speed at which KafkaSpout can read from Kafka. Supports cluster mode only.
-8. **KafkaHdfsTopo:** Measures how fast Storm can read from Kafka and write to HDFS.
+7. **KafkaSpoutNullBoltTopo:** Measures speed at which the storm-kafka KafkaSpout can read from Kafka. Supports cluster mode only.
+8. **KafkaHdfsTopo:** Measures how fast Storm can read from Kafka and write to HDFS, using the storm-kafka spout. Supports cluster mode only
+9. **KafkaClientSpoutNullBoltTopo:** Measures the speed at which the storm-kafka-client KafkaSpout can read from Kafka. Supports cluster mode only.
## How to run ?
http://git-wip-us.apache.org/repos/asf/storm/blob/71496cb6/examples/storm-perf/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml
index 7dc2579..8fad512 100644
--- a/examples/storm-perf/pom.xml
+++ b/examples/storm-perf/pom.xml
@@ -108,6 +108,16 @@
<artifactId>storm-hdfs</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/71496cb6/examples/storm-perf/src/main/conf/KafkaClientSpoutNullBoltTopo.yml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/KafkaClientSpoutNullBoltTopo.yml b/examples/storm-perf/src/main/conf/KafkaClientSpoutNullBoltTopo.yml
new file mode 100644
index 0000000..2fb1881
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/KafkaClientSpoutNullBoltTopo.yml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bootstrap.servers: "127.0.0.1:9092"
+kafka.topic: "storm-perf-null-bolt-topic"
+processing.guarantee: "AT_LEAST_ONCE"
+offset.commit.period.ms: 30000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/71496cb6/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
new file mode 100644
index 0000000..4d88702
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.perf;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.perf.bolt.DevNullBolt;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+
+/**
+ * Benchmark topology for measuring spout read/emit/ack performance. The spout reads and emits tuples. The bolt acks and discards received
+ * tuples.
+ */
+public class KafkaClientSpoutNullBoltTopo {
+
+ // configs - topo parallelism
+ public static final String SPOUT_NUM = "spout.count";
+ public static final String BOLT_NUM = "bolt.count";
+
+ // configs - kafka spout
+ public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
+ public static final String KAFKA_TOPIC = "kafka.topic";
+ public static final String PROCESSING_GUARANTEE = "processing.guarantee";
+ public static final String OFFSET_COMMIT_PERIOD_MS = "offset.commit.period.ms";
+
+ public static final int DEFAULT_SPOUT_NUM = 1;
+ public static final int DEFAULT_BOLT_NUM = 1;
+
+ // names
+ public static final String TOPOLOGY_NAME = KafkaClientSpoutNullBoltTopo.class.getSimpleName();
+ public static final String SPOUT_ID = "kafkaSpout";
+ public static final String BOLT_ID = "devNullBolt";
+
+ /**
+ * Create and configure the topology.
+ */
+ public static StormTopology getTopology(Map<String, Object> config) {
+
+ final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+ final int boltNum = Helper.getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
+ // 1 - Setup Kafka Spout --------
+
+ String bootstrapServers = Optional.ofNullable(Helper.getStr(config, BOOTSTRAP_SERVERS)).orElse("127.0.0.1:9092");
+ String kafkaTopic = Optional.ofNullable(Helper.getStr(config, KAFKA_TOPIC)).orElse("storm-perf-null-bolt-topic");
+ ProcessingGuarantee processingGuarantee = ProcessingGuarantee.valueOf(
+ Optional.ofNullable(Helper.getStr(config, PROCESSING_GUARANTEE))
+ .orElse(ProcessingGuarantee.AT_LEAST_ONCE.name()));
+ int offsetCommitPeriodMs = Helper.getInt(config, OFFSET_COMMIT_PERIOD_MS, 30_000);
+
+ KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder(bootstrapServers, kafkaTopic)
+ .setProcessingGuarantee(processingGuarantee)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
+ .setTupleTrackingEnforced(true)
+ .build();
+
+ KafkaSpout<String, String> spout = new KafkaSpout<>(kafkaSpoutConfig);
+
+ // 2 - DevNull Bolt --------
+ DevNullBolt bolt = new DevNullBolt();
+
+ // 3 - Setup Topology --------
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(SPOUT_ID, spout, spoutNum);
+ builder.setBolt(BOLT_ID, bolt, boltNum)
+ .localOrShuffleGrouping(SPOUT_ID);
+
+ return builder.createTopology();
+ }
+
+ /**
+ * Start the topology.
+ */
+ public static void main(String[] args) throws Exception {
+ int durationSec = -1;
+ Config topoConf = new Config();
+ if (args.length > 0) {
+ durationSec = Integer.parseInt(args[0]);
+ }
+ if (args.length > 1) {
+ topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+ }
+ if (args.length > 2) {
+ System.err.println("args: [runDurationSec] [optionalConfFile]");
+ return;
+ }
+
+ // Submit to Storm cluster
+ Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/71496cb6/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
index 9becb0a..f1177b6 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
@@ -89,19 +89,19 @@ public class MetricsSample {
// Executor summaries
for(ExecutorSummary executorSummary : executorSummaries){
- ExecutorStats execuatorStats = executorSummary.get_stats();
- if(execuatorStats == null){
+ ExecutorStats executorStats = executorSummary.get_stats();
+ if(executorStats == null){
continue;
}
- ExecutorSpecificStats executorSpecificStats = execuatorStats.get_specific();
+ ExecutorSpecificStats executorSpecificStats = executorStats.get_specific();
if(executorSpecificStats == null){
// bail out
continue;
}
// transferred totals
- Map<String,Map<String,Long>> transferred = execuatorStats.get_transferred();
+ Map<String,Map<String,Long>> transferred = executorStats.get_transferred();
Map<String, Long> txMap = transferred.get(":all-time");
if(txMap == null){
continue;
@@ -137,13 +137,15 @@ public class MetricsSample {
Double total = 0d;
Map<String, Double> vals = spoutStats.get_complete_ms_avg().get(":all-time");
- for(String key : vals.keySet()){
- total += vals.get(key);
+ if (vals != null) {
+ for (String key : vals.keySet()) {
+ total += vals.get(key);
+ }
+ Double latency = total / vals.size();
+ spoutLatencySum += latency;
}
- Double latency = total / vals.size();
spoutExecCount++;
- spoutLatencySum += latency;
}
[2/2] storm git commit: Merge branch 'STORM-2933' of
https://github.com/srdo/storm into STORM-2933-merge
Posted by ka...@apache.org.
Merge branch 'STORM-2933' of https://github.com/srdo/storm into STORM-2933-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/12cc49fc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/12cc49fc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/12cc49fc
Branch: refs/heads/master
Commit: 12cc49fcb99c81cda59281b8f607882f6a4ab537
Parents: b5d70e1 71496cb
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Feb 6 16:27:07 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Feb 6 16:27:07 2018 +0900
----------------------------------------------------------------------
examples/storm-perf/README.markdown | 15 +--
examples/storm-perf/pom.xml | 10 ++
.../main/conf/KafkaClientSpoutNullBoltTopo.yml | 20 ++++
.../perf/KafkaClientSpoutNullBoltTopo.java | 113 +++++++++++++++++++
.../apache/storm/perf/utils/MetricsSample.java | 18 +--
5 files changed, 161 insertions(+), 15 deletions(-)
----------------------------------------------------------------------