You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 22:34:07 UTC
[11/14] storm git commit: STORM-2416 Release Packaging Improvements
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
new file mode 100644
index 0000000..306fc25
--- /dev/null
+++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.example;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import org.apache.storm.jms.JmsProvider;
+
+
+/**
+ * A <code>JmsProvider</code> that uses the spring framework
+ * to obtain a JMS <code>ConnectionFactory</code> and
+ * <code>Desitnation</code> objects.
+ * <p/>
+ * The constructor takes three arguments:
+ * <ol>
+ * <li>A string pointing to the the spring application context file contining the JMS configuration
+ * (must be on the classpath)
+ * </li>
+ * <li>The name of the connection factory bean</li>
+ * <li>The name of the destination bean</li>
+ * </ol>
+ *
+ *
+ *
+ */
+@SuppressWarnings("serial")
+public class SpringJmsProvider implements JmsProvider {
+ private ConnectionFactory connectionFactory;
+ private Destination destination;
+
+ /**
+ * Constructs a <code>SpringJmsProvider</code> object given the name of a
+ * classpath resource (the spring application context file), and the bean
+ * names of a JMS connection factory and destination.
+ *
+ * @param appContextClasspathResource - the spring configuration file (classpath resource)
+ * @param connectionFactoryBean - the JMS connection factory bean name
+ * @param destinationBean - the JMS destination bean name
+ */
+ public SpringJmsProvider(String appContextClasspathResource, String connectionFactoryBean, String destinationBean){
+ ApplicationContext context = new ClassPathXmlApplicationContext(appContextClasspathResource);
+ this.connectionFactory = (ConnectionFactory)context.getBean(connectionFactoryBean);
+ this.destination = (Destination)context.getBean(destinationBean);
+ }
+
+ public ConnectionFactory connectionFactory() throws Exception {
+ return this.connectionFactory;
+ }
+
+ public Destination destination() throws Exception {
+ return this.destination;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-jms-examples/src/main/resources/jms-activemq.xml
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/resources/jms-activemq.xml b/examples/storm-jms-examples/src/main/resources/jms-activemq.xml
new file mode 100644
index 0000000..1a845b8
--- /dev/null
+++ b/examples/storm-jms-examples/src/main/resources/jms-activemq.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <!-- ActiveMQ -->
+
+ <!-- embedded ActiveMQ Broker -->
+ <!-- <amq:broker useJmx="false" persistent="false">
+ <amq:transportConnectors>
+ <amq:transportConnector uri="tcp://localhost:61616" />
+ </amq:transportConnectors>
+ </amq:broker> -->
+
+ <amq:queue id="notificationQueue" physicalName="backtype.storm.contrib.example.queue" />
+
+ <amq:topic id="notificationTopic" physicalName="backtype.storm.contrib.example.topic" />
+
+ <amq:connectionFactory id="jmsConnectionFactory"
+ brokerURL="tcp://localhost:61616" />
+
+ <!-- <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
+ <property name="connectionFactory">
+ <ref bean="jmsConnectionFactory" />
+ </property>
+ <property name="pubSubDomain" value="false" />
+ </bean> -->
+
+</beans>
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-jms-examples/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/resources/log4j.properties b/examples/storm-jms-examples/src/main/resources/log4j.properties
new file mode 100644
index 0000000..079b195
--- /dev/null
+++ b/examples/storm-jms-examples/src/main/resources/log4j.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+
+log4j.appender.stdout.layout.ConversionPattern=%5p (%C:%L) - %m%n
+
+
+log4j.logger.backtype.storm.contrib=DEBUG
+log4j.logger.clojure.contrib=WARN
+log4j.logger.org.springframework=WARN
+log4j.logger.org.apache.zookeeper=WARN
+
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-kafka-client-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/pom.xml b/examples/storm-kafka-client-examples/pom.xml
index 1c4449b..554818b 100644
--- a/examples/storm-kafka-client-examples/pom.xml
+++ b/examples/storm-kafka-client-examples/pom.xml
@@ -31,20 +31,6 @@
<artifactId>storm-kafka-client-examples</artifactId>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <provided.scope>provided</provided.scope>
- </properties>
-
- <profiles>
- <profile>
- <id>intellij</id>
- <properties>
- <provided.scope>compile</provided.scope>
- </properties>
- </profile>
- </profiles>
-
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-kafka-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/pom.xml b/examples/storm-kafka-examples/pom.xml
index 8c1e74d..65eeda6 100644
--- a/examples/storm-kafka-examples/pom.xml
+++ b/examples/storm-kafka-examples/pom.xml
@@ -27,20 +27,6 @@
<artifactId>storm-kafka-examples</artifactId>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <provided.scope>provided</provided.scope>
- </properties>
-
- <profiles>
- <profile>
- <id>intellij</id>
- <properties>
- <provided.scope>compile</provided.scope>
- </properties>
- </profile>
- </profiles>
-
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-mongodb-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/pom.xml b/examples/storm-mongodb-examples/pom.xml
index 6def943..297a6e8 100644
--- a/examples/storm-mongodb-examples/pom.xml
+++ b/examples/storm-mongodb-examples/pom.xml
@@ -27,20 +27,6 @@
<artifactId>storm-mongodb-examples</artifactId>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <provided.scope>provided</provided.scope>
- </properties>
-
- <profiles>
- <profile>
- <id>intellij</id>
- <properties>
- <provided.scope>compile</provided.scope>
- </properties>
- </profile>
- </profiles>
-
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-mqtt-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/pom.xml b/examples/storm-mqtt-examples/pom.xml
index ab061c4..ca2d03b 100644
--- a/examples/storm-mqtt-examples/pom.xml
+++ b/examples/storm-mqtt-examples/pom.xml
@@ -31,20 +31,6 @@
<relativePath>../../pom.xml</relativePath>
</parent>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <provided.scope>provided</provided.scope>
- </properties>
-
- <profiles>
- <profile>
- <id>intellij</id>
- <properties>
- <provided.scope>compile</provided.scope>
- </properties>
- </profile>
- </profiles>
-
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-opentsdb-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/pom.xml b/examples/storm-opentsdb-examples/pom.xml
index 2a24f8d..9ffd12f 100644
--- a/examples/storm-opentsdb-examples/pom.xml
+++ b/examples/storm-opentsdb-examples/pom.xml
@@ -27,20 +27,6 @@
<artifactId>storm-opentsdb-examples</artifactId>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <provided.scope>provided</provided.scope>
- </properties>
-
- <profiles>
- <profile>
- <id>intellij</id>
- <properties>
- <provided.scope>compile</provided.scope>
- </properties>
- </profile>
- </profiles>
-
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/README.markdown
----------------------------------------------------------------------
diff --git a/examples/storm-perf/README.markdown b/examples/storm-perf/README.markdown
new file mode 100644
index 0000000..946ab21
--- /dev/null
+++ b/examples/storm-perf/README.markdown
@@ -0,0 +1,50 @@
+# Topologies for measuring Storm performance
+
+This module includes topologies designed for measuring Storm performance.
+
+## Overview
+There are two basic modes for running these topologies
+
+- **Cluster mode:** Submits the topology to a storm cluster. This mode is useful for benchmarking. It calculates throughput and latency numbers every minute and prints them on the console.
+- **In-process mode:** Uses LocalCluster to run topology. This mode helps identify bottlenecks using profilers like JProfiler from within a IDE. This mode does not print metrics.
+
+In both the modes, a shutdown hook is setup to terminate the topology when the program that is submitting the topology is terminated.
+
+The bundled topologies can be classified into two types.
+
+- Topologies that measure purely the internal functioning of Storm. Such topologies do not interact with external systems like Kafka or Hdfs.
+- Topologies that measure speed of I/O with external systems like Kafka and Hdfs.
+
+Topologies that measure internal performance can be run in either in-proc or cluster modes.
+Topologies that measure I/O with external systems are designed to run in cluster mode only.
+
+## Topologies List
+
+1. **ConstSpoutOnlyTopo:** Helps measure how fast spout can emit. This topology has a spout and is not connected to any bolts. Supports in-proc and cluster mode.
+2. **ConstSpoutNullBoltTopo:** Helps measure how fast spout can send data to a bolt. Spout emits a stream of constant values to a DevNull bolt which discards the incoming tuples. Supports in-proc and cluster mode.
+3. **ConstSpoutIdBoltNullBoltTopo:** Helps measure speed of messaging between spouts and bolts. Spout emits a stream of constant values to an ID bolt which clones the tuple and emits it downstream to a DevNull bolt. Supports in-proc and cluster mode.
+4. **FileReadWordCount:** Measures speed of word counting. The spout loads a file into memory and emits these lines in an infinite loop. Supports in-proc and cluster mode.
+5. **HdfsSpoutNullBolt:** Measures speed at which HdfsSpout can read from HDFS. Supports cluster mode only.
+6. **StrGenSpoutHdfsBoltTopo:** Measures speed at which HdfsBolt can write to HDFS. Supports cluster mode only.
+7. **KafkaSpoutNullBolt:** Measures speed at which KafkaSpout can read from Kafka. Supports cluster mode only.
+8. **KafkaHdfsTopo:** Measures how fast Storm can read from Kafka and write to HDFS.
+
+
+## How to run ?
+
+### In-process mode:
+This mode is intended for running the topology quickly and easily from within the IDE and does not expect any command line arguments.
+Simply running the Topology's main() method without any arguments will get it running. The topology runs indefinitely till the program is terminated.
+
+
+### Cluster mode:
+When the topology is run with one or more than one cmd line arguments, the topology is submitted to the cluster.
+The first argument indicates how long the topology should be run. Often the second argument refers to a yaml config
+file which contains topology configuration settings. The conf/ directory in this module contains sample config files
+with names matching the corresponding topology.
+
+These topologies can be run using the standard storm jar command.
+
+```
+bin/storm jar /path/storm-perf-1.1.0-jar-with-dependencies.jar org.apache.storm.perf.ConstSpoutNullBoltTopo 200 conf/ConstSpoutIdBoltNullBoltTopo.yaml
+```
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml
new file mode 100644
index 0000000..64ae889
--- /dev/null
+++ b/examples/storm-perf/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-perf</artifactId>
+ <packaging>jar</packaging>
+ <name>Storm Perf</name>
+ <description>Topologies and tools to measure performance.</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+ <mainClass />
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>java</executable>
+ <includeProjectDependencies>true</includeProjectDependencies>
+ <includePluginDependencies>false</includePluginDependencies>
+ <classpathScope>compile</classpathScope>
+ <mainClass>${storm.topology}</mainClass>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <!--
+ Use "provided" scope to keep storm out of the jar-with-dependencies
+ For IntelliJ dev, intellij will load properly.
+ -->
+ <scope>${provided.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hdfs</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml b/examples/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml
new file mode 100644
index 0000000..9f74aee
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+bolt1.count : 1 # IdBolt instances
+bolt2.count : 1 # DevNullBolt instances
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml b/examples/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml
new file mode 100644
index 0000000..51f2dd7
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+bolt.count : 1
+grouping : "local" # either "shuffle" or "local"
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/FileReadWordCountTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/FileReadWordCountTopo.yaml b/examples/storm-perf/src/main/conf/FileReadWordCountTopo.yaml
new file mode 100644
index 0000000..61abe8f
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/FileReadWordCountTopo.yaml
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+splitter.count : 1
+counter.count : 1
+input.file : "/Users/roshan/Projects/idea/storm/storm-perf/src/main/resources/randomwords.txt"
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml b/examples/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml
new file mode 100644
index 0000000..a06ad6e
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+bolt.count : 1
+hdfs.uri : "hdfs://hdfs.namenode:8020"
+hdfs.source.dir : "/tmp/storm/in"
+hdfs.archive.dir : "/tmp/storm/done"
+hdfs.bad.dir : "/tmp/storm/bad"
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/KafkaHdfsTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/KafkaHdfsTopo.yaml b/examples/storm-perf/src/main/conf/KafkaHdfsTopo.yaml
new file mode 100755
index 0000000..a8ed2f2
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/KafkaHdfsTopo.yaml
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+bolt.count : 1
+kafka.topic : "kafka_topic"
+zk.uri : "zkhostname:2181"
+hdfs.uri : "hdfs://hdfs.namenode:8020"
+hdfs.dir : "/tmp/storm"
+hdfs.batch : 1000
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml b/examples/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml
new file mode 100644
index 0000000..cde4c2e
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+bolt.count : 1
+kafka.topic : "kafka_topic"
+zk.uri : "zkhostname:2181"
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml b/examples/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml
new file mode 100644
index 0000000..d16431b
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+bolt.count : 1
+hdfs.uri : "hdfs://hdfs.namenode:8020"
+hdfs.dir : "/tmp/storm"
+hdfs.batch : 1000
+
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
new file mode 100644
index 0000000..11c63d3
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.perf.bolt.DevNullBolt;
+import org.apache.storm.perf.bolt.IdBolt;
+import org.apache.storm.perf.spout.ConstSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+/**
+ * ConstSpout -> IdBolt -> DevNullBolt
+ * This topology measures speed of messaging between spouts->bolt and bolt->bolt
+ * ConstSpout : Continuously emits a constant string
+ * IdBolt : clones and emits input tuples
+ * DevNullBolt : discards incoming tuples
+ */
+public class ConstSpoutIdBoltNullBoltTopo {
+
+ public static final String TOPOLOGY_NAME = "ConstSpoutIdBoltNullBoltTopo";
+ public static final String SPOUT_ID = "constSpout";
+ public static final String BOLT1_ID = "idBolt";
+ public static final String BOLT2_ID = "nullBolt";
+
+ // Configs
+ public static final String BOLT1_COUNT = "bolt1.count";
+ public static final String BOLT2_COUNT = "bolt2.count";
+ public static final String SPOUT_COUNT = "spout.count";
+
+ public static StormTopology getTopology(Map conf) {
+
+ // 1 - Setup Spout --------
+ ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
+
+ // 2 - Setup IdBolt & DevNullBolt --------
+ IdBolt bolt1 = new IdBolt();
+ DevNullBolt bolt2 = new DevNullBolt();
+
+
+ // 3 - Setup Topology --------
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout(SPOUT_ID, spout, Helper.getInt(conf, SPOUT_COUNT, 1) );
+
+ builder.setBolt(BOLT1_ID, bolt1, Helper.getInt(conf, BOLT1_COUNT, 1))
+ .localOrShuffleGrouping(SPOUT_ID);
+
+ builder.setBolt(BOLT2_ID, bolt2, Helper.getInt(conf, BOLT2_COUNT, 1))
+ .localOrShuffleGrouping(BOLT1_ID);
+
+ return builder.createTopology();
+ }
+
+
+ public static void main(String[] args) throws Exception {
+
+ if (args.length <= 0) {
+ // submit to local cluster
+ Config conf = new Config();
+ LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
+
+ Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
+ while (true) {// run indefinitely till Ctrl-C
+ Thread.sleep(20_000_000);
+ }
+ } else {
+ // submit to real cluster
+ if (args.length >2) {
+ System.err.println("args: runDurationSec [optionalConfFile]");
+ return;
+ }
+ Integer durationSec = Integer.parseInt(args[0]);
+ Map topoConf = (args.length==2) ? Utils.findAndReadConfigFile(args[1]) : new Config();
+
+ // Submit topology to storm cluster
+ Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
new file mode 100755
index 0000000..92c2787
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.perf.bolt.DevNullBolt;
+import org.apache.storm.perf.spout.ConstSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+/***
+ * This topo helps measure the messaging speed between a spout and a bolt.
+ * Spout generates a stream of a fixed string.
+ * Bolt will simply ack and discard the tuple received
+ */
+
+public class ConstSpoutNullBoltTopo {
+
+ public static final String TOPOLOGY_NAME = "ConstSpoutNullBoltTopo";
+ public static final String SPOUT_ID = "constSpout";
+ public static final String BOLT_ID = "nullBolt";
+
+ // Configs
+ public static final String BOLT_COUNT = "bolt.count";
+ public static final String SPOUT_COUNT = "spout.count";
+ public static final String GROUPING = "grouping"; // can be 'local' or 'shuffle'
+
+ public static final String LOCAL_GROPING = "local";
+ public static final String SHUFFLE_GROUPING = "shuffle";
+ public static final String DEFAULT_GROUPING = LOCAL_GROPING;
+
+ public static StormTopology getTopology(Map conf) {
+
+ // 1 - Setup Spout --------
+ ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
+
+ // 2 - Setup DevNull Bolt --------
+ DevNullBolt bolt = new DevNullBolt();
+
+
+ // 3 - Setup Topology --------
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout(SPOUT_ID, spout, Helper.getInt(conf, SPOUT_COUNT, 1) );
+ BoltDeclarer bd = builder.setBolt(BOLT_ID, bolt, Helper.getInt(conf, BOLT_COUNT, 1));
+
+ String groupingType = Helper.getStr(conf, GROUPING);
+ if(groupingType==null || groupingType.equalsIgnoreCase(DEFAULT_GROUPING) )
+ bd.localOrShuffleGrouping(SPOUT_ID);
+ else if(groupingType.equalsIgnoreCase(SHUFFLE_GROUPING) )
+ bd.shuffleGrouping(SPOUT_ID);
+ return builder.createTopology();
+ }
+
+ /**
+ * ConstSpout -> DevNullBolt with configurable grouping (default localOrShuffle)
+ */
+ public static void main(String[] args) throws Exception {
+
+ if(args.length <= 0) {
+ // For IDE based profiling ... submit topology to local cluster
+ Config conf = new Config();
+ final LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
+
+ Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
+ while (true) {// run indefinitely till Ctrl-C
+ Thread.sleep(20_000_000);
+ }
+
+ } else {
+ // For measuring perf against a Storm cluster
+ if (args.length > 2) {
+ System.err.println("args: runDurationSec [optionalConfFile]");
+ return;
+ }
+ Integer durationSec = Integer.parseInt(args[0]);
+ Map topoConf = (args.length==2) ? Utils.findAndReadConfigFile(args[1]) : new Config();
+
+ // Submit topology to storm cluster
+ Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
new file mode 100755
index 0000000..721ae3d
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.perf.spout.ConstSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+
+
+/***
+ * This topo helps measure how fast a spout can produce data (so no bolts are attached)
+ * Spout generates a stream of a fixed string.
+ */
+
+public class ConstSpoutOnlyTopo {
+
+ public static final String TOPOLOGY_NAME = "ConstSpoutOnlyTopo";
+ public static final String SPOUT_ID = "constSpout";
+
+
+ public static StormTopology getTopology() {
+
+ // 1 - Setup Const Spout --------
+ ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
+
+ // 2 - Setup Topology --------
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(SPOUT_ID, spout, 1);
+ return builder.createTopology();
+ }
+
+ /**
+ * ConstSpout only topology (No bolts)
+ */
+ public static void main(String[] args) throws Exception {
+ if(args.length <= 0) {
+ // For IDE based profiling ... submit topology to local cluster
+ LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology());
+
+ Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
+ while (true) {// run indefinitely till Ctrl-C
+ Thread.sleep(20_000_000);
+ }
+ } else {
+ // Submit topology to storm cluster
+ if (args.length != 1) {
+ System.err.println("args: runDurationSec");
+ return;
+ }
+ Integer durationSec = Integer.parseInt(args[0]);
+
+ Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, new Config(), getTopology());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
new file mode 100644
index 0000000..d518c86
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
@@ -0,0 +1,96 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+package org.apache.storm.perf;
+
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.perf.bolt.CountBolt;
+import org.apache.storm.perf.bolt.SplitSentenceBolt;
+import org.apache.storm.perf.spout.FileReadSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+
+import java.util.Map;
+
+/***
+ * This topo helps measure speed of word count.
+ * Spout loads a file into memory on initialization, then emits the lines in an endless loop.
+ */
+
+public class FileReadWordCountTopo {
+ public static final String SPOUT_ID = "spout";
+ public static final String COUNT_ID = "counter";
+ public static final String SPLIT_ID = "splitter";
+ public static final String TOPOLOGY_NAME = "FileReadWordCountTopo";
+
+ // Config settings
+ public static final String SPOUT_NUM = "spout.count";
+ public static final String SPLIT_NUM = "splitter.count";
+ public static final String COUNT_NUM = "counter.count";
+ public static final String INPUT_FILE = "input.file";
+
+ public static final int DEFAULT_SPOUT_NUM = 1;
+ public static final int DEFAULT_SPLIT_BOLT_NUM = 2;
+ public static final int DEFAULT_COUNT_BOLT_NUM = 2;
+
+
+ public static StormTopology getTopology(Map config) {
+
+ final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+ final int spBoltNum = Helper.getInt(config, SPLIT_NUM, DEFAULT_SPLIT_BOLT_NUM);
+ final int cntBoltNum = Helper.getInt(config, COUNT_NUM, DEFAULT_COUNT_BOLT_NUM);
+ final String inputFile = Helper.getStr(config, INPUT_FILE);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(SPOUT_ID, new FileReadSpout(inputFile), spoutNum);
+ builder.setBolt(SPLIT_ID, new SplitSentenceBolt(), spBoltNum).localOrShuffleGrouping(SPOUT_ID);
+ builder.setBolt(COUNT_ID, new CountBolt(), cntBoltNum).fieldsGrouping(SPLIT_ID, new Fields(SplitSentenceBolt.FIELDS));
+
+ return builder.createTopology();
+ }
+
+ public static void main(String[] args) throws Exception {
+ if(args.length <= 0) {
+ // For IDE based profiling ... submit topology to local cluster
+ Config conf = new Config();
+ conf.put(INPUT_FILE, "resources/randomwords.txt");
+ LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
+
+ Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
+ while (true) {// run indefinitely till Ctrl-C
+ Thread.sleep(20_000_000);
+ }
+ } else {
+ // Submit to Storm cluster
+ if (args.length !=2) {
+ System.err.println("args: runDurationSec confFile");
+ return;
+ }
+ Integer durationSec = Integer.parseInt(args[0]);
+ Map topoConf = Utils.findAndReadConfigFile(args[1]);
+
+ Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
new file mode 100644
index 0000000..248b523
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.perf;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hdfs.spout.HdfsSpout;
+import org.apache.storm.hdfs.spout.TextFileReader;
+import org.apache.storm.perf.bolt.DevNullBolt;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+/***
+ * This topo helps measure speed of reading from Hdfs.
+ * Spout Reads from Hdfs.
+ * Bolt acks and discards tuples
+ */
+
+
+public class HdfsSpoutNullBoltTopo {
+ // names
+ static final String TOPOLOGY_NAME = "HdfsSpoutNullBoltTopo";
+ static final String SPOUT_ID = "hdfsSpout";
+ static final String BOLT_ID = "devNullBolt";
+
+ // configs
+ static final String SPOUT_NUM = "spout.count";
+ static final String BOLT_NUM = "bolt.count";
+
+ static final String HDFS_URI = "hdfs.uri";
+ static final String SOURCE_DIR = "hdfs.source.dir";
+ static final String ARCHIVE_DIR = "hdfs.archive.dir";
+ static final String BAD_DIR = "hdfs.bad.dir";
+
+ public static final int DEFAULT_SPOUT_NUM = 1;
+ public static final int DEFAULT_BOLT_NUM = 1;
+
+
+ public static StormTopology getTopology(Map config) {
+
+ final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+ final int boltNum = Helper.getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
+ final String fileFormat = Helper.getStr(config, "text");
+ final String hdfsUri = Helper.getStr(config, HDFS_URI);
+ final String sourceDir = Helper.getStr(config, SOURCE_DIR);
+ final String archiveDir = Helper.getStr(config, ARCHIVE_DIR);
+ final String badDir = Helper.getStr(config, BAD_DIR);
+
+
+ // 1 - Setup Hdfs Spout --------
+ HdfsSpout spout = new HdfsSpout()
+ .setReaderType(fileFormat)
+ .setHdfsUri(hdfsUri)
+ .setSourceDir(sourceDir)
+ .setArchiveDir(archiveDir)
+ .setBadFilesDir(badDir)
+ .withOutputFields(TextFileReader.defaultFields);
+
+ // 2 - DevNull Bolt --------
+ DevNullBolt bolt = new DevNullBolt();
+
+ // 3 - Setup Topology --------
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(SPOUT_ID, spout, spoutNum);
+ builder.setBolt(BOLT_ID, bolt, boltNum)
+ .localOrShuffleGrouping(SPOUT_ID);
+
+ return builder.createTopology();
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 2) {
+ System.err.println("args: runDurationSec topConfFile");
+ return;
+ }
+
+ Integer durationSec = Integer.parseInt(args[0]);
+ Map topoConf = Utils.findAndReadConfigFile(args[1]);
+
+ // Submit to Storm cluster
+ Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
new file mode 100755
index 0000000..4293aac
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hdfs.bolt.HdfsBolt;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.format.RecordFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.kafka.BrokerHosts;
+import org.apache.storm.kafka.KafkaSpout;
+import org.apache.storm.kafka.SpoutConfig;
+import org.apache.storm.kafka.StringMultiSchemeWithTopic;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.UUID;
+
+/***
+ * This topo helps measure speed of reading from Kafka and writing to Hdfs.
+ * Spout Reads from Kafka.
+ * Bolt writes to Hdfs
+ */
+
+public class KafkaHdfsTopo {
+
+ // configs - topo parallelism
+ public static final String SPOUT_NUM = "spout.count";
+ public static final String BOLT_NUM = "bolt.count";
+ // configs - kafka spout
+ public static final String KAFKA_TOPIC = "kafka.topic";
+ public static final String ZOOKEEPER_URI = "zk.uri";
+ // configs - hdfs bolt
+ public static final String HDFS_URI = "hdfs.uri";
+ public static final String HDFS_PATH = "hdfs.dir";
+ public static final String HDFS_BATCH = "hdfs.batch";
+
+
+ public static final int DEFAULT_SPOUT_NUM = 1;
+ public static final int DEFAULT_BOLT_NUM = 1;
+ public static final int DEFAULT_HDFS_BATCH = 1000;
+
+ // names
+ public static final String TOPOLOGY_NAME = "KafkaHdfsTopo";
+ public static final String SPOUT_ID = "kafkaSpout";
+ public static final String BOLT_ID = "hdfsBolt";
+
+
+
+ public static StormTopology getTopology(Map config) {
+
+ final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+ final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
+
+ final int hdfsBatch = getInt(config, HDFS_BATCH, DEFAULT_HDFS_BATCH);
+
+ // 1 - Setup Kafka Spout --------
+ String zkConnString = getStr(config, ZOOKEEPER_URI);
+ String topicName = getStr(config, KAFKA_TOPIC);
+
+ BrokerHosts brokerHosts = new ZkHosts(zkConnString);
+ SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" + topicName, UUID.randomUUID().toString());
+ spoutConfig.scheme = new StringMultiSchemeWithTopic();
+ spoutConfig.ignoreZkOffsets = true;
+
+ KafkaSpout spout = new KafkaSpout(spoutConfig);
+
+ // 2 - Setup HFS Bolt --------
+ String Hdfs_url = getStr(config, HDFS_URI);
+ RecordFormat format = new LineWriter("str");
+ SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch);
+ FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB);
+
+ FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(getStr(config,HDFS_PATH) );
+
+ // Instantiate the HdfsBolt
+ HdfsBolt bolt = new HdfsBolt()
+ .withFsUrl(Hdfs_url)
+ .withFileNameFormat(fileNameFormat)
+ .withRecordFormat(format)
+ .withRotationPolicy(rotationPolicy)
+ .withSyncPolicy(syncPolicy);
+
+
+ // 3 - Setup Topology --------
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(SPOUT_ID, spout, spoutNum);
+ builder.setBolt(BOLT_ID, bolt, boltNum)
+ .localOrShuffleGrouping(SPOUT_ID);
+
+ return builder.createTopology();
+ }
+
+
+ public static int getInt(Map map, Object key, int def) {
+ return Utils.getInt(Utils.get(map, key, def));
+ }
+
+ public static String getStr(Map map, Object key) {
+ return (String) map.get(key);
+ }
+
+
+ /** Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming */
+ public static void main(String[] args) throws Exception {
+
+ if (args.length != 2) {
+ System.err.println("args: runDurationSec topConfFile");
+ return;
+ }
+
+ Integer durationSec = Integer.parseInt(args[0]);
+ String confFile = args[1];
+ Map topoConf = Utils.findAndReadConfigFile(confFile);
+
+ // Submit topology to Storm cluster
+ Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+ }
+
+ public static class LineWriter implements RecordFormat {
+ private String lineDelimiter = System.lineSeparator();
+ private String fieldName;
+
+ public LineWriter(String fieldName) {
+ this.fieldName = fieldName;
+ }
+
+ /**
+ * Overrides the default record delimiter.
+ *
+ * @param delimiter
+ * @return
+ */
+ public LineWriter withLineDelimiter(String delimiter){
+ this.lineDelimiter = delimiter;
+ return this;
+ }
+
+ @Override
+ public byte[] format(Tuple tuple) {
+ return (tuple.getValueByField(fieldName).toString() + this.lineDelimiter).getBytes();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
new file mode 100755
index 0000000..3512c65
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
@@ -0,0 +1,114 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+package org.apache.storm.perf;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.BrokerHosts;
+import org.apache.storm.kafka.KafkaSpout;
+import org.apache.storm.kafka.SpoutConfig;
+import org.apache.storm.kafka.StringMultiSchemeWithTopic;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.perf.bolt.DevNullBolt;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.UUID;
+
+
+/***
+ * This topo helps measure speed of reading from Kafka
+ * Spout Reads from Kafka.
+ * Bolt acks and discards tuples
+ */
+
+public class KafkaSpoutNullBoltTopo {
+
+ // configs - topo parallelism
+ public static final String SPOUT_NUM = "spout.count";
+ public static final String BOLT_NUM = "bolt.count";
+
+ // configs - kafka spout
+ public static final String KAFKA_TOPIC = "kafka.topic";
+ public static final String ZOOKEEPER_URI = "zk.uri";
+
+
+ public static final int DEFAULT_SPOUT_NUM = 1;
+ public static final int DEFAULT_BOLT_NUM = 1;
+
+ // names
+ public static final String TOPOLOGY_NAME = "KafkaSpoutNullBoltTopo";
+ public static final String SPOUT_ID = "kafkaSpout";
+ public static final String BOLT_ID = "devNullBolt";
+
+
+ public static StormTopology getTopology(Map config) {
+
+ final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+ final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
+ // 1 - Setup Kafka Spout --------
+
+ String zkConnString = getStr(config, ZOOKEEPER_URI);
+ String topicName = getStr(config, KAFKA_TOPIC);
+
+ BrokerHosts brokerHosts = new ZkHosts(zkConnString);
+ SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" + topicName, UUID.randomUUID().toString());
+ spoutConfig.scheme = new StringMultiSchemeWithTopic();
+ spoutConfig.ignoreZkOffsets = true;
+
+ KafkaSpout spout = new KafkaSpout(spoutConfig);
+
+ // 2 - DevNull Bolt --------
+ DevNullBolt bolt = new DevNullBolt();
+
+ // 3 - Setup Topology --------
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(SPOUT_ID, spout, spoutNum);
+ builder.setBolt(BOLT_ID, bolt, boltNum)
+ .localOrShuffleGrouping(SPOUT_ID);
+
+ return builder.createTopology();
+ }
+
+
+ public static int getInt(Map map, Object key, int def) {
+ return Utils.getInt(Utils.get(map, key, def));
+ }
+
+ public static String getStr(Map map, Object key) {
+ return (String) map.get(key);
+ }
+
+
+ /**
+ * Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming
+ */
+ public static void main(String[] args) throws Exception {
+ if (args.length !=2) {
+ System.err.println("args: runDurationSec confFile");
+ return;
+ }
+ Integer durationSec = Integer.parseInt(args[0]);
+ Map topoConf = Utils.findAndReadConfigFile(args[1]);
+
+ // Submit to Storm cluster
+ Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
new file mode 100755
index 0000000..5b97540
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.apache.storm.perf;
+
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hdfs.bolt.HdfsBolt;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.format.RecordFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.perf.spout.StringGenSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+/***
+ * This topo helps measure speed of writing to Hdfs
+ * Spout generates fixed length random strings.
+ * Bolt writes to Hdfs
+ */
+
+public class StrGenSpoutHdfsBoltTopo {
+
+ // configs - topo parallelism
+ public static final String SPOUT_NUM = "spout.count";
+ public static final String BOLT_NUM = "bolt.count";
+
+ // configs - hdfs bolt
+ public static final String HDFS_URI = "hdfs.uri";
+ public static final String HDFS_PATH = "hdfs.dir";
+ public static final String HDFS_BATCH = "hdfs.batch";
+
+ public static final int DEFAULT_SPOUT_NUM = 1;
+ public static final int DEFAULT_BOLT_NUM = 1;
+ public static final int DEFAULT_HDFS_BATCH = 1000;
+
+ // names
+ public static final String TOPOLOGY_NAME = "StrGenSpoutHdfsBoltTopo";
+ public static final String SPOUT_ID = "GenSpout";
+ public static final String BOLT_ID = "hdfsBolt";
+
+
+ public static StormTopology getTopology(Map topoConf) {
+ final int hdfsBatch = Helper.getInt(topoConf, HDFS_BATCH, DEFAULT_HDFS_BATCH);
+
+ // 1 - Setup StringGen Spout --------
+ StringGenSpout spout = new StringGenSpout(100).withFieldName("str");
+
+
+ // 2 - Setup HFS Bolt --------
+ String Hdfs_url = Helper.getStr(topoConf, HDFS_URI);
+ RecordFormat format = new LineWriter("str");
+ SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch);
+ FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB);
+ final int spoutNum = Helper.getInt(topoConf, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+ final int boltNum = Helper.getInt(topoConf, BOLT_NUM, DEFAULT_BOLT_NUM);
+
+ // Use default, Storm-generated file names
+ FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(Helper.getStr(topoConf, HDFS_PATH) );
+
+ // Instantiate the HdfsBolt
+ HdfsBolt bolt = new HdfsBolt()
+ .withFsUrl(Hdfs_url)
+ .withFileNameFormat(fileNameFormat)
+ .withRecordFormat(format)
+ .withRotationPolicy(rotationPolicy)
+ .withSyncPolicy(syncPolicy);
+
+
+ // 3 - Setup Topology --------
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(SPOUT_ID, spout, spoutNum);
+ builder.setBolt(BOLT_ID, bolt, boltNum)
+ .localOrShuffleGrouping(SPOUT_ID);
+
+ return builder.createTopology();
+ }
+
+
+ /** Spout generates random strings and HDFS bolt writes them to a text file */
+ public static void main(String[] args) throws Exception {
+ if(args.length <= 0) {
+ // submit to local cluster
+ Map topoConf = Utils.findAndReadConfigFile("conf/HdfsSpoutTopo.yaml");
+ LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(topoConf));
+
+ Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
+ while (true) {// run indefinitely till Ctrl-C
+ Thread.sleep(20_000_000);
+ }
+ } else {
+ // Submit to Storm cluster
+ if (args.length !=2) {
+ System.err.println("args: runDurationSec confFile");
+ return;
+ }
+ Integer durationSec = Integer.parseInt(args[0]);
+ Map topoConf = Utils.findAndReadConfigFile(args[1]);
+
+ Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+ }
+ }
+
+
+ public static class LineWriter implements RecordFormat {
+ private String lineDelimiter = System.lineSeparator();
+ private String fieldName;
+
+ public LineWriter(String fieldName) {
+ this.fieldName = fieldName;
+ }
+
+ /**
+ * Overrides the default record delimiter.
+ *
+ * @param delimiter
+ * @return
+ */
+ public LineWriter withLineDelimiter(String delimiter){
+ this.lineDelimiter = delimiter;
+ return this;
+ }
+
+ public byte[] format(Tuple tuple) {
+ return (tuple.getValueByField(fieldName).toString() + this.lineDelimiter).getBytes();
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
new file mode 100644
index 0000000..b79a0ee
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.bolt;
+
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CountBolt extends BaseBasicBolt {
+ public static final String FIELDS_WORD = "word";
+ public static final String FIELDS_COUNT = "count";
+
+ Map<String, Integer> counts = new HashMap<>();
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String word = tuple.getString(0);
+ Integer count = counts.get(word);
+ if (count == null)
+ count = 0;
+ count++;
+ counts.put(word, count);
+ collector.emit(new Values(word, count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(FIELDS_WORD, FIELDS_COUNT));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
new file mode 100755
index 0000000..b85ce15
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.Map;
+
+
+public class DevNullBolt extends BaseRichBolt {
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
new file mode 100644
index 0000000..116265e
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+public class IdBolt extends BaseRichBolt {
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ collector.emit(tuple, new Values( tuple.getValues() ) );
+ collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("field1"));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
new file mode 100644
index 0000000..96f9f73
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.bolt;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+
+public class SplitSentenceBolt extends BaseBasicBolt {
+ public static final String FIELDS = "word";
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ }
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ for (String word : splitSentence(input.getString(0))) {
+ collector.emit(new Values(word));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(FIELDS));
+ }
+
+
+ public static String[] splitSentence(String sentence) {
+ if (sentence != null) {
+ return sentence.split("\\s+");
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
new file mode 100755
index 0000000..b66e4f3
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.spout;
+
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class ConstSpout extends BaseRichSpout {
+
+ private static final String DEFAUT_FIELD_NAME = "str";
+ private String value;
+ private String fieldName = DEFAUT_FIELD_NAME;
+ private SpoutOutputCollector collector = null;
+ private int count=0;
+
+ public ConstSpout(String value) {
+ this.value = value;
+ }
+
+ public ConstSpout withOutputFields(String fieldName) {
+ this.fieldName = fieldName;
+ return this;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(fieldName));
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ List<Object> tuple = Collections.singletonList((Object) value);
+ collector.emit(tuple, count++);
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ super.ack(msgId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
new file mode 100644
index 0000000..959e7c6
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class FileReadSpout extends BaseRichSpout {
+ public static final String FIELDS = "sentence";
+ private static final long serialVersionUID = -2582705611472467172L;
+ private transient FileReader reader;
+ private String file;
+ private boolean ackEnabled = true;
+ private SpoutOutputCollector collector;
+
+ private long count = 0;
+
+
+ public FileReadSpout(String file) {
+ this.file = file;
+ }
+
+ // For testing
+ FileReadSpout(FileReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context,
+ SpoutOutputCollector collector) {
+ this.collector = collector;
+ Object ackObj = conf.get("topology.acker.executors");
+ if (ackObj != null && ackObj.equals(0)) {
+ this.ackEnabled = false;
+ }
+ // for tests, reader will not be null
+ if (this.reader == null) {
+ this.reader = new FileReader(this.file);
+ }
+ }
+
+ @Override
+ public void nextTuple() {
+ if (ackEnabled) {
+ collector.emit(new Values(reader.nextLine()), count);
+ count++;
+ } else {
+ collector.emit(new Values(reader.nextLine()));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(FIELDS));
+ }
+
+ public static List<String> readLines(InputStream input) {
+ List<String> lines = new ArrayList<>();
+ try {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+ try {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ lines.add(line);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Reading file failed", e);
+ } finally {
+ reader.close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error closing reader", e);
+ }
+ return lines;
+ }
+
+ public static class FileReader implements Serializable {
+
+ private static final long serialVersionUID = -7012334600647556267L;
+
+ public final String file;
+ private List<String> contents = null;
+ private int index = 0;
+ private int limit = 0;
+
+ public FileReader(String file) {
+ this.file = file;
+ if (this.file != null) {
+ try {
+ this.contents = readLines(new FileInputStream(this.file));
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new IllegalArgumentException("Cannot open file " + file, e);
+ }
+ this.limit = contents.size();
+ } else {
+ throw new IllegalArgumentException("file name cannot be null");
+ }
+ }
+
+ public String nextLine() {
+ if (index >= limit) {
+ index = 0;
+ }
+ String line = contents.get(index);
+ index++;
+ return line;
+ }
+
+ }
+}