You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/06 07:27:45 UTC

[1/2] storm git commit: STORM-2933: Add a storm-perf topology that uses storm-kafka-client

Repository: storm
Updated Branches:
  refs/heads/master b5d70e17d -> 12cc49fcb


STORM-2933: Add a storm-perf topology that uses storm-kafka-client


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/71496cb6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/71496cb6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/71496cb6

Branch: refs/heads/master
Commit: 71496cb67ee354d1ee7303094891dd503dca97c6
Parents: 7fbe7a2
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Feb 3 11:04:54 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sat Feb 3 18:18:27 2018 +0100

----------------------------------------------------------------------
 examples/storm-perf/README.markdown             |  15 +--
 examples/storm-perf/pom.xml                     |  10 ++
 .../main/conf/KafkaClientSpoutNullBoltTopo.yml  |  20 ++++
 .../perf/KafkaClientSpoutNullBoltTopo.java      | 113 +++++++++++++++++++
 .../apache/storm/perf/utils/MetricsSample.java  |  18 +--
 5 files changed, 161 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/71496cb6/examples/storm-perf/README.markdown
----------------------------------------------------------------------
diff --git a/examples/storm-perf/README.markdown b/examples/storm-perf/README.markdown
index 946ab21..8dea58b 100644
--- a/examples/storm-perf/README.markdown
+++ b/examples/storm-perf/README.markdown
@@ -20,14 +20,15 @@ Topologies that measure I/O with external systems are designed to run in cluster
 
 ## Topologies List
 
-1. **ConstSpoutOnlyTopo:** Helps measure how fast spout can emit. This topology has a spout and is not connected to any bolts. Supports in-proc and cluster mode.
-2. **ConstSpoutNullBoltTopo:** Helps measure how fast spout can send data to a bolt. Spout emits a stream of constant values to a DevNull bolt which discards the incoming tuples. Supports in-proc and cluster mode.
-3. **ConstSpoutIdBoltNullBoltTopo:** Helps measure speed of messaging between spouts and bolts. Spout emits a stream of constant values to an ID bolt which clones the tuple and emits it downstream to a DevNull bolt. Supports in-proc and cluster mode.
-4. **FileReadWordCount:** Measures speed of word counting. The spout loads a file into memory and emits these lines in an infinite loop. Supports in-proc and cluster mode.
-5. **HdfsSpoutNullBolt:** Measures speed at which HdfsSpout can read from HDFS. Supports cluster mode only.
+1. **ConstSpoutOnlyTopo:** Helps measure how fast spout can emit. This topology has a spout and is not connected to any bolts. Supports cluster mode only.
+2. **ConstSpoutNullBoltTopo:** Helps measure how fast spout can send data to a bolt. Spout emits a stream of constant values to a DevNull bolt which discards the incoming tuples. Supports cluster mode only.
+3. **ConstSpoutIdBoltNullBoltTopo:** Helps measure speed of messaging between spouts and bolts. Spout emits a stream of constant values to an ID bolt which clones the tuple and emits it downstream to a DevNull bolt. Supports cluster mode only.
+4. **FileReadWordCountTopo:** Measures speed of word counting. The spout loads a file into memory and emits these lines in an infinite loop. Supports cluster mode only.
+5. **HdfsSpoutNullBoltTopo:** Measures speed at which HdfsSpout can read from HDFS. Supports cluster mode only.
 6. **StrGenSpoutHdfsBoltTopo:** Measures speed at which HdfsBolt can write to HDFS. Supports cluster mode only.
-7. **KafkaSpoutNullBolt:** Measures speed at which KafkaSpout can read from Kafka. Supports cluster mode only.
-8. **KafkaHdfsTopo:** Measures how fast Storm can read from Kafka and write to HDFS.
+7. **KafkaSpoutNullBoltTopo:** Measures speed at which the storm-kafka KafkaSpout can read from Kafka. Supports cluster mode only.
+8. **KafkaHdfsTopo:** Measures how fast Storm can read from Kafka and write to HDFS, using the storm-kafka spout. Supports cluster mode only
+9. **KafkaClientSpoutNullBoltTopo:** Measures the speed at which the storm-kafka-client KafkaSpout can read from Kafka. Supports cluster mode only.
 
 
 ## How to run ?

http://git-wip-us.apache.org/repos/asf/storm/blob/71496cb6/examples/storm-perf/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml
index 7dc2579..8fad512 100644
--- a/examples/storm-perf/pom.xml
+++ b/examples/storm-perf/pom.xml
@@ -108,6 +108,16 @@
             <artifactId>storm-hdfs</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/71496cb6/examples/storm-perf/src/main/conf/KafkaClientSpoutNullBoltTopo.yml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/KafkaClientSpoutNullBoltTopo.yml b/examples/storm-perf/src/main/conf/KafkaClientSpoutNullBoltTopo.yml
new file mode 100644
index 0000000..2fb1881
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/KafkaClientSpoutNullBoltTopo.yml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bootstrap.servers: "127.0.0.1:9092"
+kafka.topic: "storm-perf-null-bolt-topic"
+processing.guarantee: "AT_LEAST_ONCE"
+offset.commit.period.ms: 30000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/71496cb6/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
new file mode 100644
index 0000000..4d88702
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.perf;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.perf.bolt.DevNullBolt;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+
+/**
+ * Benchmark topology for measuring spout read/emit/ack performance. The spout reads and emits tuples. The bolt acks and discards received
+ * tuples.
+ */
+public class KafkaClientSpoutNullBoltTopo {
+
+    // configs - topo parallelism
+    public static final String SPOUT_NUM = "spout.count";
+    public static final String BOLT_NUM = "bolt.count";
+
+    // configs - kafka spout
+    public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
+    public static final String KAFKA_TOPIC = "kafka.topic";
+    public static final String PROCESSING_GUARANTEE = "processing.guarantee";
+    public static final String OFFSET_COMMIT_PERIOD_MS = "offset.commit.period.ms";
+
+    public static final int DEFAULT_SPOUT_NUM = 1;
+    public static final int DEFAULT_BOLT_NUM = 1;
+
+    // names
+    public static final String TOPOLOGY_NAME = KafkaClientSpoutNullBoltTopo.class.getSimpleName();
+    public static final String SPOUT_ID = "kafkaSpout";
+    public static final String BOLT_ID = "devNullBolt";
+
+    /**
+     * Create and configure the topology.
+     */
+    public static StormTopology getTopology(Map<String, Object> config) {
+
+        final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+        final int boltNum = Helper.getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
+        // 1 -  Setup Kafka Spout   --------
+
+        String bootstrapServers = Optional.ofNullable(Helper.getStr(config, BOOTSTRAP_SERVERS)).orElse("127.0.0.1:9092");
+        String kafkaTopic = Optional.ofNullable(Helper.getStr(config, KAFKA_TOPIC)).orElse("storm-perf-null-bolt-topic");
+        ProcessingGuarantee processingGuarantee = ProcessingGuarantee.valueOf(
+            Optional.ofNullable(Helper.getStr(config, PROCESSING_GUARANTEE))
+                .orElse(ProcessingGuarantee.AT_LEAST_ONCE.name()));
+        int offsetCommitPeriodMs = Helper.getInt(config, OFFSET_COMMIT_PERIOD_MS, 30_000);
+
+        KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder(bootstrapServers, kafkaTopic)
+            .setProcessingGuarantee(processingGuarantee)
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
+            .setTupleTrackingEnforced(true)
+            .build();
+
+        KafkaSpout<String, String> spout = new KafkaSpout<>(kafkaSpoutConfig);
+
+        // 2 -   DevNull Bolt   --------
+        DevNullBolt bolt = new DevNullBolt();
+
+        // 3 - Setup Topology  --------
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(SPOUT_ID, spout, spoutNum);
+        builder.setBolt(BOLT_ID, bolt, boltNum)
+            .localOrShuffleGrouping(SPOUT_ID);
+
+        return builder.createTopology();
+    }
+
+    /**
+     * Start the topology.
+     */
+    public static void main(String[] args) throws Exception {
+        int durationSec = -1;
+        Config topoConf = new Config();
+        if (args.length > 0) {
+            durationSec = Integer.parseInt(args[0]);
+        }
+        if (args.length > 1) {
+            topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+        }
+        if (args.length > 2) {
+            System.err.println("args: [runDurationSec]  [optionalConfFile]");
+            return;
+        }
+
+        //  Submit to Storm cluster
+        Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/71496cb6/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
index 9becb0a..f1177b6 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
@@ -89,19 +89,19 @@ public class MetricsSample {
 
         // Executor summaries
         for(ExecutorSummary executorSummary : executorSummaries){
-            ExecutorStats execuatorStats = executorSummary.get_stats();
-            if(execuatorStats == null){
+            ExecutorStats executorStats = executorSummary.get_stats();
+            if(executorStats == null){
                 continue;
             }
 
-            ExecutorSpecificStats executorSpecificStats = execuatorStats.get_specific();
+            ExecutorSpecificStats executorSpecificStats = executorStats.get_specific();
             if(executorSpecificStats == null){
                 // bail out
                 continue;
             }
 
             // transferred totals
-            Map<String,Map<String,Long>> transferred = execuatorStats.get_transferred();
+            Map<String,Map<String,Long>> transferred = executorStats.get_transferred();
             Map<String, Long> txMap = transferred.get(":all-time");
             if(txMap == null){
                 continue;
@@ -137,13 +137,15 @@ public class MetricsSample {
 
                 Double total = 0d;
                 Map<String, Double> vals = spoutStats.get_complete_ms_avg().get(":all-time");
-                for(String key : vals.keySet()){
-                    total += vals.get(key);
+                if (vals != null) {
+                    for (String key : vals.keySet()) {
+                        total += vals.get(key);
+                    }
+                    Double latency = total / vals.size();
+                    spoutLatencySum += latency;
                 }
-                Double latency = total / vals.size();
 
                 spoutExecCount++;
-                spoutLatencySum += latency;
             }
 
 


[2/2] storm git commit: Merge branch 'STORM-2933' of https://github.com/srdo/storm into STORM-2933-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2933' of https://github.com/srdo/storm into STORM-2933-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/12cc49fc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/12cc49fc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/12cc49fc

Branch: refs/heads/master
Commit: 12cc49fcb99c81cda59281b8f607882f6a4ab537
Parents: b5d70e1 71496cb
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Feb 6 16:27:07 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Feb 6 16:27:07 2018 +0900

----------------------------------------------------------------------
 examples/storm-perf/README.markdown             |  15 +--
 examples/storm-perf/pom.xml                     |  10 ++
 .../main/conf/KafkaClientSpoutNullBoltTopo.yml  |  20 ++++
 .../perf/KafkaClientSpoutNullBoltTopo.java      | 113 +++++++++++++++++++
 .../apache/storm/perf/utils/MetricsSample.java  |  18 +--
 5 files changed, 161 insertions(+), 15 deletions(-)
----------------------------------------------------------------------