You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 22:33:59 UTC
[03/14] storm git commit: STORM-2416 Release Packaging Improvements
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/conf/KafkaHdfsTopo.yaml
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/conf/KafkaHdfsTopo.yaml b/storm-perf/src/main/conf/KafkaHdfsTopo.yaml
deleted file mode 100755
index a8ed2f2..0000000
--- a/storm-perf/src/main/conf/KafkaHdfsTopo.yaml
+++ /dev/null
@@ -1,26 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-spout.count : 1
-bolt.count : 1
-kafka.topic : "kafka_topic"
-zk.uri : "zkhostname:2181"
-hdfs.uri : "hdfs://hdfs.namenode:8020"
-hdfs.dir : "/tmp/storm"
-hdfs.batch : 1000
-
-# storm config overrides
-topology.workers : 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml b/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml
deleted file mode 100644
index cde4c2e..0000000
--- a/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-spout.count : 1
-bolt.count : 1
-kafka.topic : "kafka_topic"
-zk.uri : "zkhostname:2181"
-
-# storm config overrides
-topology.workers : 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml b/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml
deleted file mode 100644
index d16431b..0000000
--- a/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml
+++ /dev/null
@@ -1,25 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-spout.count : 1
-bolt.count : 1
-hdfs.uri : "hdfs://hdfs.namenode:8020"
-hdfs.dir : "/tmp/storm"
-hdfs.batch : 1000
-
-
-# storm config overrides
-topology.workers : 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
deleted file mode 100644
index 11c63d3..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.perf.bolt.DevNullBolt;
-import org.apache.storm.perf.bolt.IdBolt;
-import org.apache.storm.perf.spout.ConstSpout;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-
-/**
- * ConstSpout -> IdBolt -> DevNullBolt
- * This topology measures speed of messaging between spouts->bolt and bolt->bolt
- * ConstSpout : Continuously emits a constant string
- * IdBolt : clones and emits input tuples
- * DevNullBolt : discards incoming tuples
- */
-public class ConstSpoutIdBoltNullBoltTopo {
-
- public static final String TOPOLOGY_NAME = "ConstSpoutIdBoltNullBoltTopo";
- public static final String SPOUT_ID = "constSpout";
- public static final String BOLT1_ID = "idBolt";
- public static final String BOLT2_ID = "nullBolt";
-
- // Configs
- public static final String BOLT1_COUNT = "bolt1.count";
- public static final String BOLT2_COUNT = "bolt2.count";
- public static final String SPOUT_COUNT = "spout.count";
-
- public static StormTopology getTopology(Map conf) {
-
- // 1 - Setup Spout --------
- ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
-
- // 2 - Setup IdBolt & DevNullBolt --------
- IdBolt bolt1 = new IdBolt();
- DevNullBolt bolt2 = new DevNullBolt();
-
-
- // 3 - Setup Topology --------
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout(SPOUT_ID, spout, Helper.getInt(conf, SPOUT_COUNT, 1) );
-
- builder.setBolt(BOLT1_ID, bolt1, Helper.getInt(conf, BOLT1_COUNT, 1))
- .localOrShuffleGrouping(SPOUT_ID);
-
- builder.setBolt(BOLT2_ID, bolt2, Helper.getInt(conf, BOLT2_COUNT, 1))
- .localOrShuffleGrouping(BOLT1_ID);
-
- return builder.createTopology();
- }
-
-
- public static void main(String[] args) throws Exception {
-
- if (args.length <= 0) {
- // submit to local cluster
- Config conf = new Config();
- LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
-
- Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
- while (true) {// run indefinitely till Ctrl-C
- Thread.sleep(20_000_000);
- }
- } else {
- // submit to real cluster
- if (args.length >2) {
- System.err.println("args: runDurationSec [optionalConfFile]");
- return;
- }
- Integer durationSec = Integer.parseInt(args[0]);
- Map topoConf = (args.length==2) ? Utils.findAndReadConfigFile(args[1]) : new Config();
-
- // Submit topology to storm cluster
- Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
deleted file mode 100755
index 92c2787..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.perf.bolt.DevNullBolt;
-import org.apache.storm.perf.spout.ConstSpout;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.BoltDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-
-/***
- * This topo helps measure the messaging speed between a spout and a bolt.
- * Spout generates a stream of a fixed string.
- * Bolt will simply ack and discard the tuple received
- */
-
-public class ConstSpoutNullBoltTopo {
-
- public static final String TOPOLOGY_NAME = "ConstSpoutNullBoltTopo";
- public static final String SPOUT_ID = "constSpout";
- public static final String BOLT_ID = "nullBolt";
-
- // Configs
- public static final String BOLT_COUNT = "bolt.count";
- public static final String SPOUT_COUNT = "spout.count";
- public static final String GROUPING = "grouping"; // can be 'local' or 'shuffle'
-
- public static final String LOCAL_GROPING = "local";
- public static final String SHUFFLE_GROUPING = "shuffle";
- public static final String DEFAULT_GROUPING = LOCAL_GROPING;
-
- public static StormTopology getTopology(Map conf) {
-
- // 1 - Setup Spout --------
- ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
-
- // 2 - Setup DevNull Bolt --------
- DevNullBolt bolt = new DevNullBolt();
-
-
- // 3 - Setup Topology --------
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout(SPOUT_ID, spout, Helper.getInt(conf, SPOUT_COUNT, 1) );
- BoltDeclarer bd = builder.setBolt(BOLT_ID, bolt, Helper.getInt(conf, BOLT_COUNT, 1));
-
- String groupingType = Helper.getStr(conf, GROUPING);
- if(groupingType==null || groupingType.equalsIgnoreCase(DEFAULT_GROUPING) )
- bd.localOrShuffleGrouping(SPOUT_ID);
- else if(groupingType.equalsIgnoreCase(SHUFFLE_GROUPING) )
- bd.shuffleGrouping(SPOUT_ID);
- return builder.createTopology();
- }
-
- /**
- * ConstSpout -> DevNullBolt with configurable grouping (default localOrShuffle)
- */
- public static void main(String[] args) throws Exception {
-
- if(args.length <= 0) {
- // For IDE based profiling ... submit topology to local cluster
- Config conf = new Config();
- final LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
-
- Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
- while (true) {// run indefinitely till Ctrl-C
- Thread.sleep(20_000_000);
- }
-
- } else {
- // For measuring perf against a Storm cluster
- if (args.length > 2) {
- System.err.println("args: runDurationSec [optionalConfFile]");
- return;
- }
- Integer durationSec = Integer.parseInt(args[0]);
- Map topoConf = (args.length==2) ? Utils.findAndReadConfigFile(args[1]) : new Config();
-
- // Submit topology to storm cluster
- Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
deleted file mode 100755
index 721ae3d..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.perf.spout.ConstSpout;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-
-
-/***
- * This topo helps measure how fast a spout can produce data (so no bolts are attached)
- * Spout generates a stream of a fixed string.
- */
-
-public class ConstSpoutOnlyTopo {
-
- public static final String TOPOLOGY_NAME = "ConstSpoutOnlyTopo";
- public static final String SPOUT_ID = "constSpout";
-
-
- public static StormTopology getTopology() {
-
- // 1 - Setup Const Spout --------
- ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
-
- // 2 - Setup Topology --------
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout(SPOUT_ID, spout, 1);
- return builder.createTopology();
- }
-
- /**
- * ConstSpout only topology (No bolts)
- */
- public static void main(String[] args) throws Exception {
- if(args.length <= 0) {
- // For IDE based profiling ... submit topology to local cluster
- LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology());
-
- Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
- while (true) {// run indefinitely till Ctrl-C
- Thread.sleep(20_000_000);
- }
- } else {
- // Submit topology to storm cluster
- if (args.length != 1) {
- System.err.println("args: runDurationSec");
- return;
- }
- Integer durationSec = Integer.parseInt(args[0]);
-
- Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, new Config(), getTopology());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
deleted file mode 100644
index d518c86..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License
-*/
-package org.apache.storm.perf;
-
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.perf.bolt.CountBolt;
-import org.apache.storm.perf.bolt.SplitSentenceBolt;
-import org.apache.storm.perf.spout.FileReadSpout;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-
-
-import java.util.Map;
-
-/***
- * This topo helps measure speed of word count.
- * Spout loads a file into memory on initialization, then emits the lines in an endless loop.
- */
-
-public class FileReadWordCountTopo {
- public static final String SPOUT_ID = "spout";
- public static final String COUNT_ID = "counter";
- public static final String SPLIT_ID = "splitter";
- public static final String TOPOLOGY_NAME = "FileReadWordCountTopo";
-
- // Config settings
- public static final String SPOUT_NUM = "spout.count";
- public static final String SPLIT_NUM = "splitter.count";
- public static final String COUNT_NUM = "counter.count";
- public static final String INPUT_FILE = "input.file";
-
- public static final int DEFAULT_SPOUT_NUM = 1;
- public static final int DEFAULT_SPLIT_BOLT_NUM = 2;
- public static final int DEFAULT_COUNT_BOLT_NUM = 2;
-
-
- public static StormTopology getTopology(Map config) {
-
- final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
- final int spBoltNum = Helper.getInt(config, SPLIT_NUM, DEFAULT_SPLIT_BOLT_NUM);
- final int cntBoltNum = Helper.getInt(config, COUNT_NUM, DEFAULT_COUNT_BOLT_NUM);
- final String inputFile = Helper.getStr(config, INPUT_FILE);
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout(SPOUT_ID, new FileReadSpout(inputFile), spoutNum);
- builder.setBolt(SPLIT_ID, new SplitSentenceBolt(), spBoltNum).localOrShuffleGrouping(SPOUT_ID);
- builder.setBolt(COUNT_ID, new CountBolt(), cntBoltNum).fieldsGrouping(SPLIT_ID, new Fields(SplitSentenceBolt.FIELDS));
-
- return builder.createTopology();
- }
-
- public static void main(String[] args) throws Exception {
- if(args.length <= 0) {
- // For IDE based profiling ... submit topology to local cluster
- Config conf = new Config();
- conf.put(INPUT_FILE, "resources/randomwords.txt");
- LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
-
- Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
- while (true) {// run indefinitely till Ctrl-C
- Thread.sleep(20_000_000);
- }
- } else {
- // Submit to Storm cluster
- if (args.length !=2) {
- System.err.println("args: runDurationSec confFile");
- return;
- }
- Integer durationSec = Integer.parseInt(args[0]);
- Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
- Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
deleted file mode 100644
index 248b523..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.perf;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.hdfs.spout.HdfsSpout;
-import org.apache.storm.hdfs.spout.TextFileReader;
-import org.apache.storm.perf.bolt.DevNullBolt;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-
-/***
- * This topo helps measure speed of reading from Hdfs.
- * Spout Reads from Hdfs.
- * Bolt acks and discards tuples
- */
-
-
-public class HdfsSpoutNullBoltTopo {
- // names
- static final String TOPOLOGY_NAME = "HdfsSpoutNullBoltTopo";
- static final String SPOUT_ID = "hdfsSpout";
- static final String BOLT_ID = "devNullBolt";
-
- // configs
- static final String SPOUT_NUM = "spout.count";
- static final String BOLT_NUM = "bolt.count";
-
- static final String HDFS_URI = "hdfs.uri";
- static final String SOURCE_DIR = "hdfs.source.dir";
- static final String ARCHIVE_DIR = "hdfs.archive.dir";
- static final String BAD_DIR = "hdfs.bad.dir";
-
- public static final int DEFAULT_SPOUT_NUM = 1;
- public static final int DEFAULT_BOLT_NUM = 1;
-
-
- public static StormTopology getTopology(Map config) {
-
- final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
- final int boltNum = Helper.getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
- final String fileFormat = Helper.getStr(config, "text");
- final String hdfsUri = Helper.getStr(config, HDFS_URI);
- final String sourceDir = Helper.getStr(config, SOURCE_DIR);
- final String archiveDir = Helper.getStr(config, ARCHIVE_DIR);
- final String badDir = Helper.getStr(config, BAD_DIR);
-
-
- // 1 - Setup Hdfs Spout --------
- HdfsSpout spout = new HdfsSpout()
- .setReaderType(fileFormat)
- .setHdfsUri(hdfsUri)
- .setSourceDir(sourceDir)
- .setArchiveDir(archiveDir)
- .setBadFilesDir(badDir)
- .withOutputFields(TextFileReader.defaultFields);
-
- // 2 - DevNull Bolt --------
- DevNullBolt bolt = new DevNullBolt();
-
- // 3 - Setup Topology --------
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout(SPOUT_ID, spout, spoutNum);
- builder.setBolt(BOLT_ID, bolt, boltNum)
- .localOrShuffleGrouping(SPOUT_ID);
-
- return builder.createTopology();
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length != 2) {
- System.err.println("args: runDurationSec topConfFile");
- return;
- }
-
- Integer durationSec = Integer.parseInt(args[0]);
- Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
- // Submit to Storm cluster
- Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
deleted file mode 100755
index 4293aac..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.hdfs.bolt.HdfsBolt;
-import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
-import org.apache.storm.hdfs.bolt.format.FileNameFormat;
-import org.apache.storm.hdfs.bolt.format.RecordFormat;
-import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
-import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
-import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
-import org.apache.storm.kafka.BrokerHosts;
-import org.apache.storm.kafka.KafkaSpout;
-import org.apache.storm.kafka.SpoutConfig;
-import org.apache.storm.kafka.StringMultiSchemeWithTopic;
-import org.apache.storm.kafka.ZkHosts;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-import java.util.UUID;
-
-/***
- * This topo helps measure speed of reading from Kafka and writing to Hdfs.
- * Spout Reads from Kafka.
- * Bolt writes to Hdfs
- */
-
-public class KafkaHdfsTopo {
-
- // configs - topo parallelism
- public static final String SPOUT_NUM = "spout.count";
- public static final String BOLT_NUM = "bolt.count";
- // configs - kafka spout
- public static final String KAFKA_TOPIC = "kafka.topic";
- public static final String ZOOKEEPER_URI = "zk.uri";
- // configs - hdfs bolt
- public static final String HDFS_URI = "hdfs.uri";
- public static final String HDFS_PATH = "hdfs.dir";
- public static final String HDFS_BATCH = "hdfs.batch";
-
-
- public static final int DEFAULT_SPOUT_NUM = 1;
- public static final int DEFAULT_BOLT_NUM = 1;
- public static final int DEFAULT_HDFS_BATCH = 1000;
-
- // names
- public static final String TOPOLOGY_NAME = "KafkaHdfsTopo";
- public static final String SPOUT_ID = "kafkaSpout";
- public static final String BOLT_ID = "hdfsBolt";
-
-
-
- public static StormTopology getTopology(Map config) {
-
- final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
- final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
-
- final int hdfsBatch = getInt(config, HDFS_BATCH, DEFAULT_HDFS_BATCH);
-
- // 1 - Setup Kafka Spout --------
- String zkConnString = getStr(config, ZOOKEEPER_URI);
- String topicName = getStr(config, KAFKA_TOPIC);
-
- BrokerHosts brokerHosts = new ZkHosts(zkConnString);
- SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" + topicName, UUID.randomUUID().toString());
- spoutConfig.scheme = new StringMultiSchemeWithTopic();
- spoutConfig.ignoreZkOffsets = true;
-
- KafkaSpout spout = new KafkaSpout(spoutConfig);
-
- // 2 - Setup HFS Bolt --------
- String Hdfs_url = getStr(config, HDFS_URI);
- RecordFormat format = new LineWriter("str");
- SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch);
- FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB);
-
- FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(getStr(config,HDFS_PATH) );
-
- // Instantiate the HdfsBolt
- HdfsBolt bolt = new HdfsBolt()
- .withFsUrl(Hdfs_url)
- .withFileNameFormat(fileNameFormat)
- .withRecordFormat(format)
- .withRotationPolicy(rotationPolicy)
- .withSyncPolicy(syncPolicy);
-
-
- // 3 - Setup Topology --------
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout(SPOUT_ID, spout, spoutNum);
- builder.setBolt(BOLT_ID, bolt, boltNum)
- .localOrShuffleGrouping(SPOUT_ID);
-
- return builder.createTopology();
- }
-
-
- public static int getInt(Map map, Object key, int def) {
- return Utils.getInt(Utils.get(map, key, def));
- }
-
- public static String getStr(Map map, Object key) {
- return (String) map.get(key);
- }
-
-
- /** Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming */
- public static void main(String[] args) throws Exception {
-
- if (args.length != 2) {
- System.err.println("args: runDurationSec topConfFile");
- return;
- }
-
- Integer durationSec = Integer.parseInt(args[0]);
- String confFile = args[1];
- Map topoConf = Utils.findAndReadConfigFile(confFile);
-
- // Submit topology to Storm cluster
- Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
- }
-
- public static class LineWriter implements RecordFormat {
- private String lineDelimiter = System.lineSeparator();
- private String fieldName;
-
- public LineWriter(String fieldName) {
- this.fieldName = fieldName;
- }
-
- /**
- * Overrides the default record delimiter.
- *
- * @param delimiter
- * @return
- */
- public LineWriter withLineDelimiter(String delimiter){
- this.lineDelimiter = delimiter;
- return this;
- }
-
- @Override
- public byte[] format(Tuple tuple) {
- return (tuple.getValueByField(fieldName).toString() + this.lineDelimiter).getBytes();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
deleted file mode 100755
index 3512c65..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License
-*/
-
-package org.apache.storm.perf;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.BrokerHosts;
-import org.apache.storm.kafka.KafkaSpout;
-import org.apache.storm.kafka.SpoutConfig;
-import org.apache.storm.kafka.StringMultiSchemeWithTopic;
-import org.apache.storm.kafka.ZkHosts;
-import org.apache.storm.perf.bolt.DevNullBolt;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-import java.util.UUID;
-
-
-/***
- * This topo helps measure speed of reading from Kafka
- * Spout Reads from Kafka.
- * Bolt acks and discards tuples
- */
-
-public class KafkaSpoutNullBoltTopo {
-
- // configs - topo parallelism
- public static final String SPOUT_NUM = "spout.count";
- public static final String BOLT_NUM = "bolt.count";
-
- // configs - kafka spout
- public static final String KAFKA_TOPIC = "kafka.topic";
- public static final String ZOOKEEPER_URI = "zk.uri";
-
-
- public static final int DEFAULT_SPOUT_NUM = 1;
- public static final int DEFAULT_BOLT_NUM = 1;
-
- // names
- public static final String TOPOLOGY_NAME = "KafkaSpoutNullBoltTopo";
- public static final String SPOUT_ID = "kafkaSpout";
- public static final String BOLT_ID = "devNullBolt";
-
-
- public static StormTopology getTopology(Map config) {
-
- final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
- final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
- // 1 - Setup Kafka Spout --------
-
- String zkConnString = getStr(config, ZOOKEEPER_URI);
- String topicName = getStr(config, KAFKA_TOPIC);
-
- BrokerHosts brokerHosts = new ZkHosts(zkConnString);
- SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" + topicName, UUID.randomUUID().toString());
- spoutConfig.scheme = new StringMultiSchemeWithTopic();
- spoutConfig.ignoreZkOffsets = true;
-
- KafkaSpout spout = new KafkaSpout(spoutConfig);
-
- // 2 - DevNull Bolt --------
- DevNullBolt bolt = new DevNullBolt();
-
- // 3 - Setup Topology --------
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout(SPOUT_ID, spout, spoutNum);
- builder.setBolt(BOLT_ID, bolt, boltNum)
- .localOrShuffleGrouping(SPOUT_ID);
-
- return builder.createTopology();
- }
-
-
- public static int getInt(Map map, Object key, int def) {
- return Utils.getInt(Utils.get(map, key, def));
- }
-
- public static String getStr(Map map, Object key) {
- return (String) map.get(key);
- }
-
-
- /**
- * Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming
- */
- public static void main(String[] args) throws Exception {
- if (args.length !=2) {
- System.err.println("args: runDurationSec confFile");
- return;
- }
- Integer durationSec = Integer.parseInt(args[0]);
- Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
- // Submit to Storm cluster
- Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
deleted file mode 100755
index 5b97540..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-
-package org.apache.storm.perf;
-
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.hdfs.bolt.HdfsBolt;
-import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
-import org.apache.storm.hdfs.bolt.format.FileNameFormat;
-import org.apache.storm.hdfs.bolt.format.RecordFormat;
-import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
-import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
-import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
-import org.apache.storm.perf.spout.StringGenSpout;
-import org.apache.storm.perf.utils.Helper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-
-/***
- * This topo helps measure speed of writing to Hdfs
- * Spout generates fixed length random strings.
- * Bolt writes to Hdfs
- */
-
-public class StrGenSpoutHdfsBoltTopo {
-
- // configs - topo parallelism
- public static final String SPOUT_NUM = "spout.count";
- public static final String BOLT_NUM = "bolt.count";
-
- // configs - hdfs bolt
- public static final String HDFS_URI = "hdfs.uri";
- public static final String HDFS_PATH = "hdfs.dir";
- public static final String HDFS_BATCH = "hdfs.batch";
-
- public static final int DEFAULT_SPOUT_NUM = 1;
- public static final int DEFAULT_BOLT_NUM = 1;
- public static final int DEFAULT_HDFS_BATCH = 1000;
-
- // names
- public static final String TOPOLOGY_NAME = "StrGenSpoutHdfsBoltTopo";
- public static final String SPOUT_ID = "GenSpout";
- public static final String BOLT_ID = "hdfsBolt";
-
-
- public static StormTopology getTopology(Map topoConf) {
- final int hdfsBatch = Helper.getInt(topoConf, HDFS_BATCH, DEFAULT_HDFS_BATCH);
-
- // 1 - Setup StringGen Spout --------
- StringGenSpout spout = new StringGenSpout(100).withFieldName("str");
-
-
- // 2 - Setup HFS Bolt --------
- String Hdfs_url = Helper.getStr(topoConf, HDFS_URI);
- RecordFormat format = new LineWriter("str");
- SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch);
- FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB);
- final int spoutNum = Helper.getInt(topoConf, SPOUT_NUM, DEFAULT_SPOUT_NUM);
- final int boltNum = Helper.getInt(topoConf, BOLT_NUM, DEFAULT_BOLT_NUM);
-
- // Use default, Storm-generated file names
- FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(Helper.getStr(topoConf, HDFS_PATH) );
-
- // Instantiate the HdfsBolt
- HdfsBolt bolt = new HdfsBolt()
- .withFsUrl(Hdfs_url)
- .withFileNameFormat(fileNameFormat)
- .withRecordFormat(format)
- .withRotationPolicy(rotationPolicy)
- .withSyncPolicy(syncPolicy);
-
-
- // 3 - Setup Topology --------
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout(SPOUT_ID, spout, spoutNum);
- builder.setBolt(BOLT_ID, bolt, boltNum)
- .localOrShuffleGrouping(SPOUT_ID);
-
- return builder.createTopology();
- }
-
-
- /** Spout generates random strings and HDFS bolt writes them to a text file */
- public static void main(String[] args) throws Exception {
- if(args.length <= 0) {
- // submit to local cluster
- Map topoConf = Utils.findAndReadConfigFile("conf/HdfsSpoutTopo.yaml");
- LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(topoConf));
-
- Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
- while (true) {// run indefinitely till Ctrl-C
- Thread.sleep(20_000_000);
- }
- } else {
- // Submit to Storm cluster
- if (args.length !=2) {
- System.err.println("args: runDurationSec confFile");
- return;
- }
- Integer durationSec = Integer.parseInt(args[0]);
- Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
- Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
- }
- }
-
-
- public static class LineWriter implements RecordFormat {
- private String lineDelimiter = System.lineSeparator();
- private String fieldName;
-
- public LineWriter(String fieldName) {
- this.fieldName = fieldName;
- }
-
- /**
- * Overrides the default record delimiter.
- *
- * @param delimiter
- * @return
- */
- public LineWriter withLineDelimiter(String delimiter){
- this.lineDelimiter = delimiter;
- return this;
- }
-
- public byte[] format(Tuple tuple) {
- return (tuple.getValueByField(fieldName).toString() + this.lineDelimiter).getBytes();
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
deleted file mode 100644
index b79a0ee..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.bolt;
-
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class CountBolt extends BaseBasicBolt {
- public static final String FIELDS_WORD = "word";
- public static final String FIELDS_COUNT = "count";
-
- Map<String, Integer> counts = new HashMap<>();
-
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- }
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String word = tuple.getString(0);
- Integer count = counts.get(word);
- if (count == null)
- count = 0;
- count++;
- counts.put(word, count);
- collector.emit(new Values(word, count));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(FIELDS_WORD, FIELDS_COUNT));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
deleted file mode 100755
index b85ce15..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.bolt;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-
-import java.util.Map;
-
-
-public class DevNullBolt extends BaseRichBolt {
- private OutputCollector collector;
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- collector.ack(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
deleted file mode 100644
index 116265e..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.bolt;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-public class IdBolt extends BaseRichBolt {
- private OutputCollector collector;
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- collector.emit(tuple, new Values( tuple.getValues() ) );
- collector.ack(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("field1"));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
deleted file mode 100644
index 96f9f73..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.bolt;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-
-public class SplitSentenceBolt extends BaseBasicBolt {
- public static final String FIELDS = "word";
-
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- }
-
- @Override
- public void execute(Tuple input, BasicOutputCollector collector) {
- for (String word : splitSentence(input.getString(0))) {
- collector.emit(new Values(word));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(FIELDS));
- }
-
-
- public static String[] splitSentence(String sentence) {
- if (sentence != null) {
- return sentence.split("\\s+");
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java b/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
deleted file mode 100755
index b66e4f3..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.spout;
-
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class ConstSpout extends BaseRichSpout {
-
- private static final String DEFAUT_FIELD_NAME = "str";
- private String value;
- private String fieldName = DEFAUT_FIELD_NAME;
- private SpoutOutputCollector collector = null;
- private int count=0;
-
- public ConstSpout(String value) {
- this.value = value;
- }
-
- public ConstSpout withOutputFields(String fieldName) {
- this.fieldName = fieldName;
- return this;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(fieldName));
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void nextTuple() {
- List<Object> tuple = Collections.singletonList((Object) value);
- collector.emit(tuple, count++);
- }
-
- @Override
- public void ack(Object msgId) {
- super.ack(msgId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java b/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
deleted file mode 100644
index 959e7c6..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.spout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class FileReadSpout extends BaseRichSpout {
- public static final String FIELDS = "sentence";
- private static final long serialVersionUID = -2582705611472467172L;
- private transient FileReader reader;
- private String file;
- private boolean ackEnabled = true;
- private SpoutOutputCollector collector;
-
- private long count = 0;
-
-
- public FileReadSpout(String file) {
- this.file = file;
- }
-
- // For testing
- FileReadSpout(FileReader reader) {
- this.reader = reader;
- }
-
- @Override
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- this.collector = collector;
- Object ackObj = conf.get("topology.acker.executors");
- if (ackObj != null && ackObj.equals(0)) {
- this.ackEnabled = false;
- }
- // for tests, reader will not be null
- if (this.reader == null) {
- this.reader = new FileReader(this.file);
- }
- }
-
- @Override
- public void nextTuple() {
- if (ackEnabled) {
- collector.emit(new Values(reader.nextLine()), count);
- count++;
- } else {
- collector.emit(new Values(reader.nextLine()));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(FIELDS));
- }
-
- public static List<String> readLines(InputStream input) {
- List<String> lines = new ArrayList<>();
- try {
- BufferedReader reader = new BufferedReader(new InputStreamReader(input));
- try {
- String line;
- while ((line = reader.readLine()) != null) {
- lines.add(line);
- }
- } catch (IOException e) {
- throw new RuntimeException("Reading file failed", e);
- } finally {
- reader.close();
- }
- } catch (IOException e) {
- throw new RuntimeException("Error closing reader", e);
- }
- return lines;
- }
-
- public static class FileReader implements Serializable {
-
- private static final long serialVersionUID = -7012334600647556267L;
-
- public final String file;
- private List<String> contents = null;
- private int index = 0;
- private int limit = 0;
-
- public FileReader(String file) {
- this.file = file;
- if (this.file != null) {
- try {
- this.contents = readLines(new FileInputStream(this.file));
- } catch (IOException e) {
- e.printStackTrace();
- throw new IllegalArgumentException("Cannot open file " + file, e);
- }
- this.limit = contents.size();
- } else {
- throw new IllegalArgumentException("file name cannot be null");
- }
- }
-
- public String nextLine() {
- if (index >= limit) {
- index = 0;
- }
- String line = contents.get(index);
- index++;
- return line;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java b/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
deleted file mode 100755
index f9c665b..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/spout/StringGenSpout.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.spout;
-
-
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-/** Spout pre-computes a list with 30k fixed length random strings.
- * Emits sequentially from this list, over and over again.
- */
-
-public class StringGenSpout extends BaseRichSpout {
-
- private static final String DEFAULT_FIELD_NAME = "str";
- private int strLen;
- private final int strCount = 30_000;
- private String fieldName = DEFAULT_FIELD_NAME;
- private SpoutOutputCollector collector = null;
- ArrayList<String> records;
- private int curr=0;
- private int count=0;
-
- public StringGenSpout(int strLen) {
- this.strLen = strLen;
- }
-
- public StringGenSpout withFieldName(String fieldName) {
- this.fieldName = fieldName;
- return this;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare( new Fields(fieldName) );
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.records = genStringList(strLen, strCount);
-
- this.collector = collector;
- }
-
- private static ArrayList<String> genStringList(int strLen, int count) {
- ArrayList<String> result = new ArrayList<String>(count);
- for (int i = 0; i < count; i++) {
- result.add( RandomStringUtils.random(strLen) );
- }
- return result;
- }
-
- @Override
- public void nextTuple() {
- List<Object> tuple;
- if( curr < strCount ) {
- tuple = Collections.singletonList((Object) records.get(curr));
- ++curr;
- collector.emit(tuple, ++count);
- }
- }
-
-
- @Override
- public void ack(Object msgId) {
- super.ack(msgId);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java b/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
deleted file mode 100755
index 686f9da..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.utils;
-
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.utils.NimbusClient;
-import org.apache.storm.utils.Utils;
-import org.apache.log4j.Logger;
-
-import java.io.PrintWriter;
-import java.util.*;
-
-
-public class BasicMetricsCollector {
-
- private LocalCluster localCluster = null;
- private Nimbus.Client client = null;
- private PrintWriter dataWriter;
- private long startTime=0;
-
- public enum MetricsItem {
- TOPOLOGY_STATS,
- XSFER_RATE,
- SPOUT_THROUGHPUT,
- SPOUT_LATENCY,
- ALL
- }
-
-
- /* headers */
- public static final String TIME = "elapsed (sec)";
- public static final String TIME_FORMAT = "%d";
- public static final String TOTAL_SLOTS = "total_slots";
- public static final String USED_SLOTS = "used_slots";
- public static final String WORKERS = "workers";
- public static final String TASKS = "tasks";
- public static final String EXECUTORS = "executors";
- public static final String TRANSFERRED = "transferred (messages)";
- public static final String XSFER_RATE = "transfer rate (messages/s)";
- public static final String SPOUT_EXECUTORS = "spout_executors";
- public static final String SPOUT_TRANSFERRED = "spout_transferred (messages)";
- public static final String SPOUT_ACKED = "spout_acks";
- public static final String SPOUT_THROUGHPUT = "spout_throughput (acks/s)";
- public static final String SPOUT_AVG_COMPLETE_LATENCY = "spout_avg_complete_latency(ms)";
- public static final String SPOUT_AVG_LATENCY_FORMAT = "%.1f";
- public static final String SPOUT_MAX_COMPLETE_LATENCY = "spout_max_complete_latency(ms)";
- public static final String SPOUT_MAX_LATENCY_FORMAT = "%.1f";
- private static final Logger LOG = Logger.getLogger(BasicMetricsCollector.class);
- final MetricsCollectorConfig config;
- // final StormTopology topology;
- final Set<String> header = new LinkedHashSet<String>();
- final Map<String, String> metrics = new HashMap<String, String>();
- int lineNumber = 0;
-
- final boolean collectTopologyStats;
- final boolean collectExecutorStats;
- final boolean collectThroughput;
-
- final boolean collectSpoutThroughput;
- final boolean collectSpoutLatency;
-
- private MetricsSample lastSample;
- private MetricsSample curSample;
- private double maxLatency = 0;
-
- boolean first = true;
-
- public BasicMetricsCollector(Nimbus.Client client, String topoName, Map stormConfig) {
- this(topoName, stormConfig);
- this.client = client;
- this.localCluster = null;
- }
-
- public BasicMetricsCollector(LocalCluster localCluster, String topoName, Map stormConfig) {
- this(topoName, stormConfig);
- this.client = null;
- this.localCluster = localCluster;
- }
-
- private BasicMetricsCollector(String topoName, Map stormConfig) {
- Set<MetricsItem> items = getMetricsToCollect();
- this.config = new MetricsCollectorConfig(topoName, stormConfig);
- collectTopologyStats = collectTopologyStats(items);
- collectExecutorStats = collectExecutorStats(items);
- collectThroughput = collectThroughput(items);
- collectSpoutThroughput = collectSpoutThroughput(items);
- collectSpoutLatency = collectSpoutLatency(items);
- dataWriter = new PrintWriter(System.err);
- }
-
-
- private Set<MetricsItem> getMetricsToCollect() {
- Set<MetricsItem> result = new HashSet<>();
- result.add(MetricsItem.ALL);
- return result;
- }
-
- public void collect(Nimbus.Client client) {
- try {
- if (!first) {
- this.lastSample = this.curSample;
- this.curSample = MetricsSample.factory(client, config.name);
- updateStats(dataWriter);
- writeLine(dataWriter);
- } else {
- LOG.info("Getting baseline metrics sample.");
- writeHeader(dataWriter);
- this.curSample = MetricsSample.factory(client, config.name);
- first = false;
- startTime = System.currentTimeMillis();
- }
- } catch (Exception e) {
- LOG.error("storm metrics failed! ", e);
- }
- }
-
- public void collect(LocalCluster localCluster) {
- try {
- if (!first) {
- this.lastSample = this.curSample;
- this.curSample = MetricsSample.factory(localCluster, config.name);
- updateStats(dataWriter);
- writeLine(dataWriter);
- } else {
- LOG.info("Getting baseline metrics sample.");
- writeHeader(dataWriter);
- this.curSample = MetricsSample.factory(localCluster, config.name);
- first = false;
- startTime = System.currentTimeMillis();
- }
- } catch (Exception e) {
- LOG.error("storm metrics failed! ", e);
- }
- }
-
- public void close() {
- dataWriter.close();
- }
-
- boolean updateStats(PrintWriter writer)
- throws Exception {
- if (collectTopologyStats) {
- updateTopologyStats();
- }
- if (collectExecutorStats) {
- updateExecutorStats();
- }
- return true;
- }
-
- void updateTopologyStats() {
- long timeTotal = System.currentTimeMillis() - startTime;
- int numWorkers = this.curSample.getNumWorkers();
- int numExecutors = this.curSample.getNumExecutors();
- int numTasks = this.curSample.getNumTasks();
- metrics.put(TIME, String.format(TIME_FORMAT, timeTotal / 1000));
- metrics.put(WORKERS, Integer.toString(numWorkers));
- metrics.put(EXECUTORS, Integer.toString(numExecutors));
- metrics.put(TASKS, Integer.toString(numTasks));
- }
-
- void updateExecutorStats() {
- long timeDiff = this.curSample.getSampleTime() - this.lastSample.getSampleTime();
- long transferredDiff = this.curSample.getTotalTransferred() - this.lastSample.getTotalTransferred();
- long throughput = transferredDiff / (timeDiff / 1000);
-
- long spoutDiff = this.curSample.getSpoutTransferred() - this.lastSample.getSpoutTransferred();
- long spoutAckedDiff = this.curSample.getTotalAcked() - this.lastSample.getTotalAcked();
- long spoutThroughput = spoutDiff / (timeDiff / 1000);
-
- if (collectThroughput) {
- metrics.put(TRANSFERRED, Long.toString(transferredDiff));
- metrics.put(XSFER_RATE, Long.toString(throughput));
- }
-
- if (collectSpoutThroughput) {
-
- metrics.put(SPOUT_EXECUTORS, Integer.toString(this.curSample.getSpoutExecutors()));
- metrics.put(SPOUT_TRANSFERRED, Long.toString(spoutDiff));
- metrics.put(SPOUT_ACKED, Long.toString(spoutAckedDiff));
- metrics.put(SPOUT_THROUGHPUT, Long.toString(spoutThroughput));
- }
-
-
- if (collectSpoutLatency) {
- double latency = this.curSample.getTotalLatency();
- if (latency > this.maxLatency) {
- this.maxLatency = latency;
- }
- metrics.put(SPOUT_AVG_COMPLETE_LATENCY,
- String.format(SPOUT_AVG_LATENCY_FORMAT, latency));
- metrics.put(SPOUT_MAX_COMPLETE_LATENCY,
- String.format(SPOUT_MAX_LATENCY_FORMAT, this.maxLatency));
-
- }
- }
-
-
- void writeHeader(PrintWriter writer) {
- header.add(TIME);
- if (collectTopologyStats) {
- header.add(WORKERS);
- header.add(TASKS);
- header.add(EXECUTORS);
- }
-
- if (collectThroughput) {
- header.add(TRANSFERRED);
- header.add(XSFER_RATE);
- }
-
- if (collectSpoutThroughput) {
- header.add(SPOUT_EXECUTORS);
- header.add(SPOUT_TRANSFERRED);
- header.add(SPOUT_ACKED);
- header.add(SPOUT_THROUGHPUT);
- }
-
- if (collectSpoutLatency) {
- header.add(SPOUT_AVG_COMPLETE_LATENCY);
- header.add(SPOUT_MAX_COMPLETE_LATENCY);
- }
-
- writer.println("\n------------------------------------------------------------------------------------------------------------------");
- String str = Utils.join(header, ",");
- writer.println(str);
- writer.println("------------------------------------------------------------------------------------------------------------------");
- writer.flush();
- }
-
- void writeLine(PrintWriter writer) {
- List<String> line = new LinkedList<String>();
- for (String h : header) {
- line.add(metrics.get(h));
- }
- String str = Utils.join(line, ",");
- writer.println(str);
- writer.flush();
- }
-
-
- boolean collectTopologyStats(Set<MetricsItem> items) {
- return items.contains(MetricsItem.ALL) ||
- items.contains(MetricsItem.TOPOLOGY_STATS);
- }
-
- boolean collectExecutorStats(Set<MetricsItem> items) {
- return items.contains(MetricsItem.ALL) ||
- items.contains(MetricsItem.XSFER_RATE) ||
- items.contains(MetricsItem.SPOUT_LATENCY);
- }
-
- boolean collectThroughput(Set<MetricsItem> items) {
- return items.contains(MetricsItem.ALL) ||
- items.contains(MetricsItem.XSFER_RATE);
- }
-
- boolean collectSpoutThroughput(Set<MetricsItem> items) {
- return items.contains(MetricsItem.ALL) ||
- items.contains(MetricsItem.SPOUT_THROUGHPUT);
- }
-
- boolean collectSpoutLatency(Set<MetricsItem> items) {
- return items.contains(MetricsItem.ALL) ||
- items.contains(MetricsItem.SPOUT_LATENCY);
- }
-
-
-
- public static class MetricsCollectorConfig {
- private static final Logger LOG = Logger.getLogger(MetricsCollectorConfig.class);
-
- // storm configuration
- public final Map stormConfig;
- // storm topology name
- public final String name;
- // benchmark label
- public final String label;
-
- public MetricsCollectorConfig(String topoName, Map stormConfig) {
- this.stormConfig = stormConfig;
- String labelStr = (String) stormConfig.get("benchmark.label");
- this.name = topoName;
- if (labelStr == null) {
- LOG.warn("'benchmark.label' not found in config. Defaulting to topology name");
- labelStr = this.name;
- }
- this.label = labelStr;
- }
- } // MetricsCollectorConfig
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java b/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
deleted file mode 100755
index 8bcd84f..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.utils;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.KillOptions;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.utils.NimbusClient;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-
-
-public class Helper {
-
- public static void kill(Nimbus.Client client, String topoName) throws Exception {
- KillOptions opts = new KillOptions();
- opts.set_wait_secs(0);
- client.killTopologyWithOpts(topoName, opts);
- }
-
- public static void killAndShutdownCluster(LocalCluster cluster, String topoName) throws Exception {
- KillOptions opts = new KillOptions();
- opts.set_wait_secs(0);
- cluster.killTopologyWithOpts(topoName, opts);
- cluster.shutdown();
- }
-
-
- public static LocalCluster runOnLocalCluster(String topoName, StormTopology topology) throws Exception {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(topoName, new Config(), topology);
- return cluster;
- }
-
- public static int getInt(Map map, Object key, int def) {
- return Utils.getInt(Utils.get(map, key, def));
- }
-
- public static String getStr(Map map, Object key) {
- return (String) map.get(key);
- }
-
- public static void collectMetricsAndKill(String topologyName, Integer pollInterval, Integer duration) throws Exception {
- Map clusterConf = Utils.readStormConfig();
- Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
- BasicMetricsCollector metricsCollector = new BasicMetricsCollector(client, topologyName, clusterConf);
-
- int times = duration / pollInterval;
- metricsCollector.collect(client);
- for (int i = 0; i < times; i++) {
- Thread.sleep(pollInterval * 1000);
- metricsCollector.collect(client);
- }
- metricsCollector.close();
- kill(client, topologyName);
- }
-
- public static void collectLocalMetricsAndKill(LocalCluster localCluster, String topologyName, Integer pollInterval, Integer duration, Map clusterConf) throws Exception {
- BasicMetricsCollector metricsCollector = new BasicMetricsCollector(localCluster, topologyName, clusterConf);
-
- int times = duration / pollInterval;
- metricsCollector.collect(localCluster);
- for (int i = 0; i < times; i++) {
- Thread.sleep(pollInterval * 1000);
- metricsCollector.collect(localCluster);
- }
- metricsCollector.close();
- killAndShutdownCluster(localCluster, topologyName);
- }
-
- /** Kill topo and Shutdown local cluster on Ctrl-C */
- public static void setupShutdownHook(final LocalCluster cluster, final String topoName) {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- try {
- cluster.killTopology(topoName);
- System.out.println("Killed Topology");
- } catch (Exception e) {
- System.err.println("Encountered error in killing topology: " + e);
- }
- cluster.shutdown();
- }
- });
- }
-
- /** Kill topo on Ctrl-C */
- public static void setupShutdownHook(final String topoName) {
- Map clusterConf = Utils.readStormConfig();
- final Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- try {
- Helper.kill(client, topoName);
- System.out.println("Killed Topology");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- public static void runOnClusterAndPrintMetrics(Integer durationSec, String topoName, Map topoConf, StormTopology topology) throws Exception {
- // submit topology
- StormSubmitter.submitTopologyWithProgressBar(topoName, topoConf, topology);
- setupShutdownHook(topoName); // handle Ctrl-C
-
- // poll metrics every minute, then kill topology after specified duration
- Integer pollIntervalSec = 60;
- collectMetricsAndKill(topoName, pollIntervalSec, durationSec);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java b/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
deleted file mode 100755
index 396ad53..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/utils/IdentityBolt.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.utils;
-
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.util.Map;
-
-
-public class IdentityBolt extends BaseRichBolt {
- private OutputCollector collector;
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- collector.emit(tuple, tuple.getValues() );
- collector.ack(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-}
-