You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 22:34:07 UTC

[11/14] storm git commit: STORM-2416 Release Packaging Improvements

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
new file mode 100644
index 0000000..306fc25
--- /dev/null
+++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.example;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import org.apache.storm.jms.JmsProvider;
+
+
+/**
+ * A <code>JmsProvider</code> that uses the spring framework
+ * to obtain a JMS <code>ConnectionFactory</code> and 
+ * <code>Desitnation</code> objects.
+ * <p/>
+ * The constructor takes three arguments:
+ * <ol>
+ * <li>A string pointing to the the spring application context file contining the JMS configuration
+ * (must be on the classpath)
+ * </li>
+ * <li>The name of the connection factory bean</li>
+ * <li>The name of the destination bean</li>
+ * </ol>
+ * 
+ *
+ *
+ */
+@SuppressWarnings("serial")
+public class SpringJmsProvider implements JmsProvider {
+	private ConnectionFactory connectionFactory;
+	private Destination destination;
+	
+	/**
+	 * Constructs a <code>SpringJmsProvider</code> object given the name of a
+	 * classpath resource (the spring application context file), and the bean
+	 * names of a JMS connection factory and destination.
+	 * 
+	 * @param appContextClasspathResource - the spring configuration file (classpath resource)
+	 * @param connectionFactoryBean - the JMS connection factory bean name
+	 * @param destinationBean - the JMS destination bean name
+	 */
+	public SpringJmsProvider(String appContextClasspathResource, String connectionFactoryBean, String destinationBean){
+		ApplicationContext context = new ClassPathXmlApplicationContext(appContextClasspathResource);
+		this.connectionFactory = (ConnectionFactory)context.getBean(connectionFactoryBean);
+		this.destination = (Destination)context.getBean(destinationBean);
+	}
+
+	public ConnectionFactory connectionFactory() throws Exception {
+		return this.connectionFactory;
+	}
+
+	public Destination destination() throws Exception {
+		return this.destination;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-jms-examples/src/main/resources/jms-activemq.xml
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/resources/jms-activemq.xml b/examples/storm-jms-examples/src/main/resources/jms-activemq.xml
new file mode 100644
index 0000000..1a845b8
--- /dev/null
+++ b/examples/storm-jms-examples/src/main/resources/jms-activemq.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans 
+  xmlns="http://www.springframework.org/schema/beans" 
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+	<!-- ActiveMQ -->
+
+	<!-- embedded ActiveMQ Broker -->
+	<!-- <amq:broker useJmx="false" persistent="false">
+		<amq:transportConnectors>
+			<amq:transportConnector uri="tcp://localhost:61616" />
+		</amq:transportConnectors>
+	</amq:broker> -->
+
+	<amq:queue id="notificationQueue" physicalName="backtype.storm.contrib.example.queue" />
+	
+	<amq:topic id="notificationTopic" physicalName="backtype.storm.contrib.example.topic" />
+
+	<amq:connectionFactory id="jmsConnectionFactory"
+		brokerURL="tcp://localhost:61616" />
+
+	<!-- <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
+		<property name="connectionFactory">
+			<ref bean="jmsConnectionFactory" />
+		</property>
+		<property name="pubSubDomain" value="false" />
+	</bean> -->
+	
+</beans>
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-jms-examples/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/resources/log4j.properties b/examples/storm-jms-examples/src/main/resources/log4j.properties
new file mode 100644
index 0000000..079b195
--- /dev/null
+++ b/examples/storm-jms-examples/src/main/resources/log4j.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+
+log4j.appender.stdout.layout.ConversionPattern=%5p (%C:%L) - %m%n
+
+
+log4j.logger.backtype.storm.contrib=DEBUG
+log4j.logger.clojure.contrib=WARN
+log4j.logger.org.springframework=WARN
+log4j.logger.org.apache.zookeeper=WARN
+

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-kafka-client-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/pom.xml b/examples/storm-kafka-client-examples/pom.xml
index 1c4449b..554818b 100644
--- a/examples/storm-kafka-client-examples/pom.xml
+++ b/examples/storm-kafka-client-examples/pom.xml
@@ -31,20 +31,6 @@
 
     <artifactId>storm-kafka-client-examples</artifactId>
 
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <provided.scope>provided</provided.scope>
-    </properties>
-
-    <profiles>
-        <profile>
-            <id>intellij</id>
-            <properties>
-                <provided.scope>compile</provided.scope>
-            </properties>
-        </profile>
-    </profiles>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-kafka-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/pom.xml b/examples/storm-kafka-examples/pom.xml
index 8c1e74d..65eeda6 100644
--- a/examples/storm-kafka-examples/pom.xml
+++ b/examples/storm-kafka-examples/pom.xml
@@ -27,20 +27,6 @@
 
     <artifactId>storm-kafka-examples</artifactId>
 
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <provided.scope>provided</provided.scope>
-    </properties>
-
-    <profiles>
-        <profile>
-            <id>intellij</id>
-            <properties>
-                <provided.scope>compile</provided.scope>
-            </properties>
-        </profile>
-    </profiles>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-mongodb-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/pom.xml b/examples/storm-mongodb-examples/pom.xml
index 6def943..297a6e8 100644
--- a/examples/storm-mongodb-examples/pom.xml
+++ b/examples/storm-mongodb-examples/pom.xml
@@ -27,20 +27,6 @@
 
     <artifactId>storm-mongodb-examples</artifactId>
 
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <provided.scope>provided</provided.scope>
-    </properties>
-
-    <profiles>
-        <profile>
-            <id>intellij</id>
-            <properties>
-                <provided.scope>compile</provided.scope>
-            </properties>
-        </profile>
-    </profiles>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-mqtt-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/pom.xml b/examples/storm-mqtt-examples/pom.xml
index ab061c4..ca2d03b 100644
--- a/examples/storm-mqtt-examples/pom.xml
+++ b/examples/storm-mqtt-examples/pom.xml
@@ -31,20 +31,6 @@
     <relativePath>../../pom.xml</relativePath>
   </parent>
 
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <provided.scope>provided</provided.scope>
-  </properties>
-
-  <profiles>
-    <profile>
-      <id>intellij</id>
-      <properties>
-        <provided.scope>compile</provided.scope>
-      </properties>
-    </profile>
-  </profiles>
-
   <dependencies>
    <dependency>
       <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-opentsdb-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/pom.xml b/examples/storm-opentsdb-examples/pom.xml
index 2a24f8d..9ffd12f 100644
--- a/examples/storm-opentsdb-examples/pom.xml
+++ b/examples/storm-opentsdb-examples/pom.xml
@@ -27,20 +27,6 @@
 
     <artifactId>storm-opentsdb-examples</artifactId>
 
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <provided.scope>provided</provided.scope>
-    </properties>
-
-    <profiles>
-        <profile>
-            <id>intellij</id>
-            <properties>
-                <provided.scope>compile</provided.scope>
-            </properties>
-        </profile>
-    </profiles>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/README.markdown
----------------------------------------------------------------------
diff --git a/examples/storm-perf/README.markdown b/examples/storm-perf/README.markdown
new file mode 100644
index 0000000..946ab21
--- /dev/null
+++ b/examples/storm-perf/README.markdown
@@ -0,0 +1,50 @@
+# Topologies for measuring Storm performance
+
+This module includes topologies designed for measuring Storm performance.
+
+## Overview
+There are two basic modes for running these topologies
+
+- **Cluster mode:** Submits the topology to a storm cluster. This mode is useful for benchmarking. It calculates throughput and latency numbers every minute and prints them on the console.
+- **In-process mode:** Uses LocalCluster to run topology. This mode helps identify bottlenecks using profilers like JProfiler from within a IDE. This mode does not print metrics.
+
+In both the modes, a shutdown hook is setup to terminate the topology when the program that is submitting the topology is terminated.
+
+The bundled topologies can be classified into two types.
+
+- Topologies that measure purely the internal functioning of Storm. Such topologies do not interact with external systems like Kafka or Hdfs.
+- Topologies that measure speed of I/O with external systems like Kafka and Hdfs.
+
+Topologies that measure internal performance can be run in either in-proc or cluster modes.
+Topologies that measure I/O with external systems are designed to run in cluster mode only.
+
+## Topologies List
+
+1. **ConstSpoutOnlyTopo:** Helps measure how fast spout can emit. This topology has a spout and is not connected to any bolts. Supports in-proc and cluster mode.
+2. **ConstSpoutNullBoltTopo:** Helps measure how fast spout can send data to a bolt. Spout emits a stream of constant values to a DevNull bolt which discards the incoming tuples. Supports in-proc and cluster mode.
+3. **ConstSpoutIdBoltNullBoltTopo:** Helps measure speed of messaging between spouts and bolts. Spout emits a stream of constant values to an ID bolt which clones the tuple and emits it downstream to a DevNull bolt. Supports in-proc and cluster mode.
+4. **FileReadWordCount:** Measures speed of word counting. The spout loads a file into memory and emits these lines in an infinite loop. Supports in-proc and cluster mode.
+5. **HdfsSpoutNullBolt:** Measures speed at which HdfsSpout can read from HDFS. Supports cluster mode only.
+6. **StrGenSpoutHdfsBoltTopo:** Measures speed at which HdfsBolt can write to HDFS. Supports cluster mode only.
+7. **KafkaSpoutNullBolt:** Measures speed at which KafkaSpout can read from Kafka. Supports cluster mode only.
+8. **KafkaHdfsTopo:** Measures how fast Storm can read from Kafka and write to HDFS.
+
+
+## How to run ?
+
+### In-process mode:
+This mode is intended for running the topology quickly and easily from within the IDE and does not expect any command line arguments.
+Simply running the Topology's main() method without any arguments will get it running. The topology runs indefinitely till the program is terminated.
+
+
+### Cluster mode:
+When the topology is run with one or more than one cmd line arguments, the topology is submitted to the cluster.
+The first argument indicates how long the topology should be run. Often the second argument refers to a yaml config
+file which contains topology configuration settings. The conf/ directory in this module contains sample config files
+with names matching the corresponding topology.
+
+These topologies can be run using the standard storm jar command.
+
+```
+bin/storm jar  /path/storm-perf-1.1.0-jar-with-dependencies.jar org.apache.storm.perf.ConstSpoutNullBoltTopo  200  conf/ConstSpoutIdBoltNullBoltTopo.yaml
+```
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml
new file mode 100644
index 0000000..64ae889
--- /dev/null
+++ b/examples/storm-perf/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <groupId>org.apache.storm</groupId>
+    <artifactId>storm-perf</artifactId>
+    <packaging>jar</packaging>
+    <name>Storm Perf</name>
+    <description>Topologies and tools to measure performance.</description>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <archive>
+                        <manifest>
+                            <mainClass />
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+
+            </plugin>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>1.2.1</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>exec</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <executable>java</executable>
+                    <includeProjectDependencies>true</includeProjectDependencies>
+                    <includePluginDependencies>false</includePluginDependencies>
+                    <classpathScope>compile</classpathScope>
+                    <mainClass>${storm.topology}</mainClass>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <!--
+              Use "provided" scope to keep storm out of the jar-with-dependencies
+              For IntelliJ dev, intellij will load properly.
+            -->
+            <scope>${provided.scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hdfs</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml b/examples/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml
new file mode 100644
index 0000000..9f74aee
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/ConstSpoutIdBoltNullBoltTopo.yaml
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+bolt1.count : 1  # IdBolt instances
+bolt2.count : 1  # DevNullBolt instances
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml b/examples/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml
new file mode 100644
index 0000000..51f2dd7
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/ConstSpoutNullBoltTopo.yaml
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+bolt.count : 1
+grouping : "local"  # either  "shuffle" or "local"
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/FileReadWordCountTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/FileReadWordCountTopo.yaml b/examples/storm-perf/src/main/conf/FileReadWordCountTopo.yaml
new file mode 100644
index 0000000..61abe8f
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/FileReadWordCountTopo.yaml
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+splitter.count : 1
+counter.count : 1
+input.file : "/Users/roshan/Projects/idea/storm/storm-perf/src/main/resources/randomwords.txt"
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml b/examples/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml
new file mode 100644
index 0000000..a06ad6e
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/HdfsSpoutNullBoltTopo.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+bolt.count : 1
+hdfs.uri : "hdfs://hdfs.namenode:8020"
+hdfs.source.dir :  "/tmp/storm/in"
+hdfs.archive.dir : "/tmp/storm/done"
+hdfs.bad.dir : "/tmp/storm/bad"
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/KafkaHdfsTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/KafkaHdfsTopo.yaml b/examples/storm-perf/src/main/conf/KafkaHdfsTopo.yaml
new file mode 100755
index 0000000..a8ed2f2
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/KafkaHdfsTopo.yaml
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+bolt.count : 1
+kafka.topic : "kafka_topic"
+zk.uri : "zkhostname:2181"
+hdfs.uri : "hdfs://hdfs.namenode:8020"
+hdfs.dir : "/tmp/storm"
+hdfs.batch : 1000
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml b/examples/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml
new file mode 100644
index 0000000..cde4c2e
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/KafkaSpoutNullBoltTopo.yaml
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+bolt.count : 1
+kafka.topic : "kafka_topic"
+zk.uri : "zkhostname:2181"
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml b/examples/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml
new file mode 100644
index 0000000..d16431b
--- /dev/null
+++ b/examples/storm-perf/src/main/conf/StrGenSpoutHdfsBoltTopo.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spout.count : 1
+bolt.count : 1
+hdfs.uri : "hdfs://hdfs.namenode:8020"
+hdfs.dir : "/tmp/storm"
+hdfs.batch : 1000
+
+
+# storm config overrides
+topology.workers : 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
new file mode 100644
index 0000000..11c63d3
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.perf.bolt.DevNullBolt;
+import org.apache.storm.perf.bolt.IdBolt;
+import org.apache.storm.perf.spout.ConstSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+/**
+ * ConstSpout -> IdBolt -> DevNullBolt
+ * This topology measures speed of messaging between spouts->bolt  and  bolt->bolt
+ *   ConstSpout : Continuously emits a constant string
+ *   IdBolt : clones and emits input tuples
+ *   DevNullBolt : discards incoming tuples
+ */
+public class ConstSpoutIdBoltNullBoltTopo {
+
+    public static final String TOPOLOGY_NAME = "ConstSpoutIdBoltNullBoltTopo";
+    public static final String SPOUT_ID = "constSpout";
+    public static final String BOLT1_ID = "idBolt";
+    public static final String BOLT2_ID = "nullBolt";
+
+    // Configs
+    public static final String BOLT1_COUNT = "bolt1.count";
+    public static final String BOLT2_COUNT = "bolt2.count";
+    public static final String SPOUT_COUNT = "spout.count";
+
+    public static StormTopology getTopology(Map conf) {
+
+        // 1 -  Setup Spout   --------
+        ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
+
+        // 2 -  Setup IdBolt & DevNullBolt   --------
+        IdBolt bolt1 = new IdBolt();
+        DevNullBolt bolt2 = new DevNullBolt();
+
+
+        // 3 - Setup Topology  --------
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(SPOUT_ID, spout,  Helper.getInt(conf, SPOUT_COUNT, 1) );
+
+        builder.setBolt(BOLT1_ID, bolt1, Helper.getInt(conf, BOLT1_COUNT, 1))
+                .localOrShuffleGrouping(SPOUT_ID);
+
+        builder.setBolt(BOLT2_ID, bolt2, Helper.getInt(conf, BOLT2_COUNT, 1))
+                .localOrShuffleGrouping(BOLT1_ID);
+
+        return builder.createTopology();
+    }
+
+
+    public static void main(String[] args) throws Exception {
+
+        if (args.length <= 0) {
+            // submit to local cluster
+            Config conf = new Config();
+            LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
+
+            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
+            while (true) {//  run indefinitely till Ctrl-C
+                Thread.sleep(20_000_000);
+            }
+        } else {
+            // submit to real cluster
+            if (args.length >2) {
+                System.err.println("args: runDurationSec  [optionalConfFile]");
+                return;
+            }
+            Integer durationSec = Integer.parseInt(args[0]);
+            Map topoConf =  (args.length==2) ? Utils.findAndReadConfigFile(args[1])  : new Config();
+
+            //  Submit topology to storm cluster
+            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
new file mode 100755
index 0000000..92c2787
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.perf.bolt.DevNullBolt;
+import org.apache.storm.perf.spout.ConstSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+/***
+ * This topo helps measure the messaging speed between a spout and a bolt.
+ *  Spout generates a stream of a fixed string.
+ *  Bolt will simply ack and discard the tuple received
+ */
+
+public class ConstSpoutNullBoltTopo {
+
+    public static final String TOPOLOGY_NAME = "ConstSpoutNullBoltTopo";
+    public static final String SPOUT_ID = "constSpout";
+    public static final String BOLT_ID = "nullBolt";
+
+    // Configs
+    public static final String BOLT_COUNT = "bolt.count";
+    public static final String SPOUT_COUNT = "spout.count";
+    public static final String GROUPING = "grouping"; // can be 'local' or 'shuffle'
+
+    public static final String LOCAL_GROPING = "local";
+    public static final String SHUFFLE_GROUPING = "shuffle";
+    public static final String DEFAULT_GROUPING = LOCAL_GROPING;
+
+    public static StormTopology getTopology(Map conf) {
+
+        // 1 -  Setup Spout   --------
+        ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
+
+        // 2 -  Setup DevNull Bolt   --------
+        DevNullBolt bolt = new DevNullBolt();
+
+
+        // 3 - Setup Topology  --------
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(SPOUT_ID, spout,  Helper.getInt(conf, SPOUT_COUNT, 1) );
+        BoltDeclarer bd = builder.setBolt(BOLT_ID, bolt, Helper.getInt(conf, BOLT_COUNT, 1));
+
+        String groupingType = Helper.getStr(conf, GROUPING);
+        if(groupingType==null || groupingType.equalsIgnoreCase(DEFAULT_GROUPING) )
+            bd.localOrShuffleGrouping(SPOUT_ID);
+        else if(groupingType.equalsIgnoreCase(SHUFFLE_GROUPING) )
+            bd.shuffleGrouping(SPOUT_ID);
+        return builder.createTopology();
+    }
+
+    /**
+     * ConstSpout -> DevNullBolt with configurable grouping (default localOrShuffle)
+     */
+    public static void main(String[] args) throws Exception {
+
+        if(args.length <= 0) {
+            // For IDE based profiling ... submit topology to local cluster
+            Config conf = new Config();
+            final LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
+
+            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
+            while (true) {//  run indefinitely till Ctrl-C
+                Thread.sleep(20_000_000);
+            }
+
+        } else {
+            // For measuring perf against a Storm cluster
+            if (args.length > 2) {
+                System.err.println("args: runDurationSec  [optionalConfFile]");
+                return;
+            }
+            Integer durationSec = Integer.parseInt(args[0]);
+            Map topoConf =  (args.length==2) ? Utils.findAndReadConfigFile(args[1])  : new Config();
+
+            //  Submit topology to storm cluster
+            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+        }
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
new file mode 100755
index 0000000..721ae3d
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.perf.spout.ConstSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+
+
+/***
+ * This topo helps measure how fast a spout can produce data (so no bolts are attached)
+ *  Spout generates a stream of a fixed string.
+ */
+
+public class ConstSpoutOnlyTopo {
+
+    public static final String TOPOLOGY_NAME = "ConstSpoutOnlyTopo";
+    public static final String SPOUT_ID = "constSpout";
+
+
+    public static StormTopology getTopology() {
+
+        // 1 -  Setup Const Spout   --------
+        ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
+
+        // 2 - Setup Topology  --------
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(SPOUT_ID, spout, 1);
+        return builder.createTopology();
+    }
+
+    /**
+     * ConstSpout only topology  (No bolts)
+     */
+    public static void main(String[] args) throws Exception {
+        if(args.length <= 0) {
+            // For IDE based profiling ... submit topology to local cluster
+            LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology());
+
+            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
+            while (true) {//  run indefinitely till Ctrl-C
+                Thread.sleep(20_000_000);
+            }
+        } else {
+            //  Submit topology to storm cluster
+            if (args.length != 1) {
+                System.err.println("args: runDurationSec");
+                return;
+            }
+            Integer durationSec = Integer.parseInt(args[0]);
+
+            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, new Config(), getTopology());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
new file mode 100644
index 0000000..d518c86
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
@@ -0,0 +1,96 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+package org.apache.storm.perf;
+
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.perf.bolt.CountBolt;
+import org.apache.storm.perf.bolt.SplitSentenceBolt;
+import org.apache.storm.perf.spout.FileReadSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+
+import java.util.Map;
+
+/***
+ * This topo helps measure speed of word count.
+ *  Spout loads a file into memory on initialization, then emits the lines in an endless loop.
+ */
+
+public class FileReadWordCountTopo {
+    public static final String SPOUT_ID =   "spout";
+    public static final String COUNT_ID =   "counter";
+    public static final String SPLIT_ID =   "splitter";
+    public static final String TOPOLOGY_NAME = "FileReadWordCountTopo";
+
+    // Config settings
+    public static final String SPOUT_NUM =  "spout.count";
+    public static final String SPLIT_NUM =  "splitter.count";
+    public static final String COUNT_NUM =  "counter.count";
+    public static final String INPUT_FILE = "input.file";
+
+    public static final int DEFAULT_SPOUT_NUM = 1;
+    public static final int DEFAULT_SPLIT_BOLT_NUM = 2;
+    public static final int DEFAULT_COUNT_BOLT_NUM = 2;
+
+
+    public static StormTopology getTopology(Map config) {
+
+        final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+        final int spBoltNum = Helper.getInt(config, SPLIT_NUM, DEFAULT_SPLIT_BOLT_NUM);
+        final int cntBoltNum = Helper.getInt(config, COUNT_NUM, DEFAULT_COUNT_BOLT_NUM);
+        final String inputFile = Helper.getStr(config, INPUT_FILE);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(SPOUT_ID, new FileReadSpout(inputFile), spoutNum);
+        builder.setBolt(SPLIT_ID, new SplitSentenceBolt(), spBoltNum).localOrShuffleGrouping(SPOUT_ID);
+        builder.setBolt(COUNT_ID, new CountBolt(), cntBoltNum).fieldsGrouping(SPLIT_ID, new Fields(SplitSentenceBolt.FIELDS));
+
+        return builder.createTopology();
+    }
+
+    public static void main(String[] args) throws Exception {
+        if(args.length <= 0) {
+            // For IDE based profiling ... submit topology to local cluster
+            Config conf = new Config();
+            conf.put(INPUT_FILE, "resources/randomwords.txt");
+            LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
+
+            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
+            while (true) {//  run indefinitely till Ctrl-C
+                Thread.sleep(20_000_000);
+            }
+        } else {
+            //  Submit to Storm cluster
+            if (args.length !=2) {
+                System.err.println("args: runDurationSec  confFile");
+                return;
+            }
+            Integer durationSec = Integer.parseInt(args[0]);
+            Map topoConf = Utils.findAndReadConfigFile(args[1]);
+
+            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
new file mode 100644
index 0000000..248b523
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/HdfsSpoutNullBoltTopo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.perf;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hdfs.spout.HdfsSpout;
+import org.apache.storm.hdfs.spout.TextFileReader;
+import org.apache.storm.perf.bolt.DevNullBolt;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+/***
+ * This topo helps measure speed of reading from Hdfs.
+ *  Spout Reads from Hdfs.
+ *  Bolt acks and discards tuples
+ */
+
+
+public class HdfsSpoutNullBoltTopo {
+    // names
+    static final String TOPOLOGY_NAME = "HdfsSpoutNullBoltTopo";
+    static final String SPOUT_ID = "hdfsSpout";
+    static final String BOLT_ID = "devNullBolt";
+
+    // configs
+    static final String SPOUT_NUM = "spout.count";
+    static final String BOLT_NUM = "bolt.count";
+
+    static final String HDFS_URI    = "hdfs.uri";
+    static final String SOURCE_DIR  = "hdfs.source.dir";
+    static final String ARCHIVE_DIR = "hdfs.archive.dir";
+    static final String BAD_DIR     = "hdfs.bad.dir";
+
+    public static final int DEFAULT_SPOUT_NUM = 1;
+    public static final int DEFAULT_BOLT_NUM = 1;
+
+
+    public static StormTopology getTopology(Map config) {
+
+        final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+        final int boltNum = Helper.getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
+        final String fileFormat = Helper.getStr(config, "text");
+        final String hdfsUri = Helper.getStr(config, HDFS_URI);
+        final String sourceDir = Helper.getStr(config, SOURCE_DIR);
+        final String archiveDir = Helper.getStr(config, ARCHIVE_DIR);
+        final String badDir = Helper.getStr(config, BAD_DIR);
+
+
+        // 1 -  Setup Hdfs Spout   --------
+        HdfsSpout spout = new HdfsSpout()
+                .setReaderType(fileFormat)
+                .setHdfsUri(hdfsUri)
+                .setSourceDir(sourceDir)
+                .setArchiveDir(archiveDir)
+                .setBadFilesDir(badDir)
+                .withOutputFields(TextFileReader.defaultFields);
+
+        // 2 -   DevNull Bolt   --------
+        DevNullBolt bolt = new DevNullBolt();
+
+        // 3 - Setup Topology  --------
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(SPOUT_ID, spout, spoutNum);
+        builder.setBolt(BOLT_ID, bolt, boltNum)
+                .localOrShuffleGrouping(SPOUT_ID);
+
+        return builder.createTopology();
+    }
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 2) {
+            System.err.println("args: runDurationSec topConfFile");
+            return;
+        }
+
+        Integer durationSec = Integer.parseInt(args[0]);
+        Map topoConf = Utils.findAndReadConfigFile(args[1]);
+
+        // Submit to Storm cluster
+        Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
new file mode 100755
index 0000000..4293aac
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaHdfsTopo.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hdfs.bolt.HdfsBolt;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.format.RecordFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.kafka.BrokerHosts;
+import org.apache.storm.kafka.KafkaSpout;
+import org.apache.storm.kafka.SpoutConfig;
+import org.apache.storm.kafka.StringMultiSchemeWithTopic;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.UUID;
+
+/***
+ * This topo helps measure speed of reading from Kafka and writing to Hdfs.
+ *  Spout Reads from Kafka.
+ *  Bolt writes to Hdfs
+ */
+
+public class KafkaHdfsTopo {
+
+  // configs - topo parallelism
+  public static final String SPOUT_NUM = "spout.count";
+  public static final String BOLT_NUM = "bolt.count";
+  // configs - kafka spout
+  public static final String KAFKA_TOPIC = "kafka.topic";
+  public static final String ZOOKEEPER_URI = "zk.uri";
+  // configs - hdfs bolt
+  public static final String HDFS_URI = "hdfs.uri";
+  public static final String HDFS_PATH = "hdfs.dir";
+  public static final String HDFS_BATCH = "hdfs.batch";
+
+
+  public static final int DEFAULT_SPOUT_NUM = 1;
+  public static final int DEFAULT_BOLT_NUM = 1;
+  public static final int DEFAULT_HDFS_BATCH = 1000;
+
+  // names
+  public static final String TOPOLOGY_NAME = "KafkaHdfsTopo";
+  public static final String SPOUT_ID = "kafkaSpout";
+  public static final String BOLT_ID = "hdfsBolt";
+
+
+
+  public static StormTopology getTopology(Map config) {
+
+    final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+    final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
+
+    final int hdfsBatch = getInt(config, HDFS_BATCH, DEFAULT_HDFS_BATCH);
+
+    // 1 -  Setup Kafka Spout   --------
+    String zkConnString = getStr(config, ZOOKEEPER_URI);
+    String topicName = getStr(config, KAFKA_TOPIC);
+
+    BrokerHosts brokerHosts = new ZkHosts(zkConnString);
+    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" + topicName, UUID.randomUUID().toString());
+    spoutConfig.scheme = new StringMultiSchemeWithTopic();
+    spoutConfig.ignoreZkOffsets = true;
+
+    KafkaSpout spout = new KafkaSpout(spoutConfig);
+
+    // 2 -  Setup HFS Bolt   --------
+    String Hdfs_url = getStr(config, HDFS_URI);
+    RecordFormat format = new LineWriter("str");
+    SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch);
+    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB);
+
+    FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(getStr(config,HDFS_PATH) );
+
+    // Instantiate the HdfsBolt
+    HdfsBolt bolt = new HdfsBolt()
+            .withFsUrl(Hdfs_url)
+            .withFileNameFormat(fileNameFormat)
+            .withRecordFormat(format)
+            .withRotationPolicy(rotationPolicy)
+            .withSyncPolicy(syncPolicy);
+
+
+    // 3 - Setup Topology  --------
+    TopologyBuilder builder = new TopologyBuilder();
+    builder.setSpout(SPOUT_ID, spout, spoutNum);
+    builder.setBolt(BOLT_ID, bolt, boltNum)
+            .localOrShuffleGrouping(SPOUT_ID);
+
+    return builder.createTopology();
+  }
+
+
+  public static int getInt(Map map, Object key, int def) {
+    return Utils.getInt(Utils.get(map, key, def));
+  }
+
+  public static String getStr(Map map, Object key) {
+    return (String) map.get(key);
+  }
+
+
+    /** Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming */
+    public static void main(String[] args) throws Exception {
+
+        if (args.length != 2) {
+            System.err.println("args: runDurationSec topConfFile");
+            return;
+        }
+
+        Integer durationSec = Integer.parseInt(args[0]);
+        String confFile = args[1];
+        Map topoConf = Utils.findAndReadConfigFile(confFile);
+
+        //  Submit topology to Storm cluster
+        Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+    }
+
+    public static class LineWriter implements RecordFormat {
+        private String lineDelimiter = System.lineSeparator();
+        private String fieldName;
+
+        public LineWriter(String fieldName) {
+            this.fieldName = fieldName;
+        }
+
+        /**
+         * Overrides the default record delimiter.
+         *
+         * @param delimiter
+         * @return
+         */
+        public LineWriter withLineDelimiter(String delimiter){
+            this.lineDelimiter = delimiter;
+            return this;
+        }
+
+        @Override
+        public byte[] format(Tuple tuple) {
+            return (tuple.getValueByField(fieldName).toString() +  this.lineDelimiter).getBytes();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
new file mode 100755
index 0000000..3512c65
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
@@ -0,0 +1,114 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+package org.apache.storm.perf;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.BrokerHosts;
+import org.apache.storm.kafka.KafkaSpout;
+import org.apache.storm.kafka.SpoutConfig;
+import org.apache.storm.kafka.StringMultiSchemeWithTopic;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.perf.bolt.DevNullBolt;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.UUID;
+
+
+/***
+ * This topo helps measure speed of reading from Kafka
+ *   Spout Reads from Kafka.
+ *   Bolt acks and discards tuples
+ */
+
+public class KafkaSpoutNullBoltTopo {
+
+    // configs - topo parallelism
+    public static final String SPOUT_NUM = "spout.count";
+    public static final String BOLT_NUM = "bolt.count";
+
+    // configs - kafka spout
+    public static final String KAFKA_TOPIC = "kafka.topic";
+    public static final String ZOOKEEPER_URI = "zk.uri";
+
+
+    public static final int DEFAULT_SPOUT_NUM = 1;
+    public static final int DEFAULT_BOLT_NUM = 1;
+
+    // names
+    public static final String TOPOLOGY_NAME = "KafkaSpoutNullBoltTopo";
+    public static final String SPOUT_ID = "kafkaSpout";
+    public static final String BOLT_ID = "devNullBolt";
+
+
+    public static StormTopology getTopology(Map config) {
+
+        final int spoutNum = getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+        final int boltNum = getInt(config, BOLT_NUM, DEFAULT_BOLT_NUM);
+        // 1 -  Setup Kafka Spout   --------
+
+        String zkConnString = getStr(config, ZOOKEEPER_URI);
+        String topicName = getStr(config, KAFKA_TOPIC);
+
+        BrokerHosts brokerHosts = new ZkHosts(zkConnString);
+        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topicName, "/" + topicName, UUID.randomUUID().toString());
+        spoutConfig.scheme = new StringMultiSchemeWithTopic();
+        spoutConfig.ignoreZkOffsets = true;
+
+        KafkaSpout spout = new KafkaSpout(spoutConfig);
+
+        // 2 -   DevNull Bolt   --------
+        DevNullBolt bolt = new DevNullBolt();
+
+        // 3 - Setup Topology  --------
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(SPOUT_ID, spout, spoutNum);
+        builder.setBolt(BOLT_ID, bolt, boltNum)
+                .localOrShuffleGrouping(SPOUT_ID);
+
+        return builder.createTopology();
+    }
+
+
+    public static int getInt(Map map, Object key, int def) {
+        return Utils.getInt(Utils.get(map, key, def));
+    }
+
+    public static String getStr(Map map, Object key) {
+        return (String) map.get(key);
+    }
+
+
+    /**
+     * Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir after its done consuming
+     */
+    public static void main(String[] args) throws Exception {
+        if (args.length !=2) {
+            System.err.println("args: runDurationSec confFile");
+            return;
+        }
+        Integer durationSec = Integer.parseInt(args[0]);
+        Map topoConf = Utils.findAndReadConfigFile(args[1]);
+
+        //  Submit to Storm cluster
+        Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
new file mode 100755
index 0000000..5b97540
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.apache.storm.perf;
+
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hdfs.bolt.HdfsBolt;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.format.RecordFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.perf.spout.StringGenSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+/***
+ * This topo helps measure speed of writing to Hdfs
+ *  Spout generates fixed length random strings.
+ *  Bolt writes to Hdfs
+ */
+
+public class StrGenSpoutHdfsBoltTopo {
+
+    // configs - topo parallelism
+    public static final String SPOUT_NUM = "spout.count";
+    public static final String BOLT_NUM =  "bolt.count";
+
+    // configs - hdfs bolt
+    public static final String HDFS_URI   = "hdfs.uri";
+    public static final String HDFS_PATH  = "hdfs.dir";
+    public static final String HDFS_BATCH = "hdfs.batch";
+
+    public static final int DEFAULT_SPOUT_NUM = 1;
+    public static final int DEFAULT_BOLT_NUM = 1;
+    public static final int DEFAULT_HDFS_BATCH = 1000;
+
+    // names
+    public static final String TOPOLOGY_NAME = "StrGenSpoutHdfsBoltTopo";
+    public static final String SPOUT_ID = "GenSpout";
+    public static final String BOLT_ID = "hdfsBolt";
+
+
+    public static StormTopology getTopology(Map topoConf) {
+        final int hdfsBatch = Helper.getInt(topoConf, HDFS_BATCH, DEFAULT_HDFS_BATCH);
+
+        // 1 -  Setup StringGen Spout   --------
+        StringGenSpout spout = new StringGenSpout(100).withFieldName("str");
+
+
+        // 2 -  Setup HFS Bolt   --------
+        String Hdfs_url = Helper.getStr(topoConf, HDFS_URI);
+        RecordFormat format = new LineWriter("str");
+        SyncPolicy syncPolicy = new CountSyncPolicy(hdfsBatch);
+        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB);
+        final int spoutNum = Helper.getInt(topoConf, SPOUT_NUM, DEFAULT_SPOUT_NUM);
+        final int boltNum = Helper.getInt(topoConf, BOLT_NUM, DEFAULT_BOLT_NUM);
+
+        // Use default, Storm-generated file names
+        FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath(Helper.getStr(topoConf, HDFS_PATH) );
+
+        // Instantiate the HdfsBolt
+        HdfsBolt bolt = new HdfsBolt()
+                .withFsUrl(Hdfs_url)
+                .withFileNameFormat(fileNameFormat)
+                .withRecordFormat(format)
+                .withRotationPolicy(rotationPolicy)
+                .withSyncPolicy(syncPolicy);
+
+
+        // 3 - Setup Topology  --------
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(SPOUT_ID, spout, spoutNum);
+        builder.setBolt(BOLT_ID, bolt, boltNum)
+                .localOrShuffleGrouping(SPOUT_ID);
+
+        return builder.createTopology();
+    }
+
+
+    /** Spout generates random strings and HDFS bolt writes them to a text file */
+    public static void main(String[] args) throws Exception {
+        if(args.length <= 0) {
+            // submit to local cluster
+            Map topoConf = Utils.findAndReadConfigFile("conf/HdfsSpoutTopo.yaml");
+            LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(topoConf));
+
+            Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
+            while (true) {//  run indefinitely till Ctrl-C
+                Thread.sleep(20_000_000);
+            }
+        } else {
+            //  Submit to Storm cluster
+            if (args.length !=2) {
+                System.err.println("args: runDurationSec confFile");
+                return;
+            }
+            Integer durationSec = Integer.parseInt(args[0]);
+            Map topoConf = Utils.findAndReadConfigFile(args[1]);
+
+            Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+        }
+    }
+
+
+    public static class LineWriter implements RecordFormat {
+        private String lineDelimiter = System.lineSeparator();
+        private String fieldName;
+
+        public LineWriter(String fieldName) {
+            this.fieldName = fieldName;
+        }
+
+        /**
+         * Overrides the default record delimiter.
+         *
+         * @param delimiter
+         * @return
+         */
+        public LineWriter withLineDelimiter(String delimiter){
+            this.lineDelimiter = delimiter;
+            return this;
+        }
+
+        public byte[] format(Tuple tuple) {
+            return (tuple.getValueByField(fieldName).toString() +  this.lineDelimiter).getBytes();
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
new file mode 100644
index 0000000..b79a0ee
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/CountBolt.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.bolt;
+
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CountBolt extends BaseBasicBolt {
+    public static final String FIELDS_WORD = "word";
+    public static final String FIELDS_COUNT = "count";
+
+    Map<String, Integer> counts = new HashMap<>();
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context) {
+    }
+
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+        String word = tuple.getString(0);
+        Integer count = counts.get(word);
+        if (count == null)
+            count = 0;
+        count++;
+        counts.put(word, count);
+        collector.emit(new Values(word, count));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(FIELDS_WORD, FIELDS_COUNT));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
new file mode 100755
index 0000000..b85ce15
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/DevNullBolt.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.Map;
+
+
+public class DevNullBolt extends BaseRichBolt {
+    private OutputCollector collector;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
new file mode 100644
index 0000000..116265e
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/IdBolt.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+public class IdBolt extends BaseRichBolt {
+    private OutputCollector collector;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        collector.emit(tuple, new Values( tuple.getValues() ) );
+        collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("field1"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
new file mode 100644
index 0000000..96f9f73
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/bolt/SplitSentenceBolt.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.bolt;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+
+public class SplitSentenceBolt extends BaseBasicBolt {
+    public static final String FIELDS = "word";
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context) {
+    }
+
+    @Override
+    public void execute(Tuple input, BasicOutputCollector collector) {
+        for (String word : splitSentence(input.getString(0))) {
+            collector.emit(new Values(word));
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(FIELDS));
+    }
+
+
+    public static String[] splitSentence(String sentence) {
+        if (sentence != null) {
+            return sentence.split("\\s+");
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
new file mode 100755
index 0000000..b66e4f3
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/ConstSpout.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.spout;
+
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class ConstSpout extends BaseRichSpout {
+
+    private static final String DEFAUT_FIELD_NAME = "str";
+    private String value;
+    private String fieldName = DEFAUT_FIELD_NAME;
+    private SpoutOutputCollector collector = null;
+    private int count=0;
+
+    public ConstSpout(String value) {
+        this.value = value;
+    }
+
+    public ConstSpout withOutputFields(String fieldName) {
+        this.fieldName = fieldName;
+        return this;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(fieldName));
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void nextTuple() {
+        List<Object> tuple = Collections.singletonList((Object) value);
+        collector.emit(tuple, count++);
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        super.ack(msgId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
new file mode 100644
index 0000000..959e7c6
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/spout/FileReadSpout.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class FileReadSpout extends BaseRichSpout {
+    public static final String FIELDS = "sentence";
+    private static final long serialVersionUID = -2582705611472467172L;
+    private transient FileReader reader;
+    private String file;
+    private boolean ackEnabled = true;
+    private SpoutOutputCollector collector;
+
+    private long count = 0;
+
+
+    public FileReadSpout(String file) {
+        this.file = file;
+    }
+
+    // For testing
+    FileReadSpout(FileReader reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context,
+                     SpoutOutputCollector collector) {
+        this.collector = collector;
+        Object ackObj = conf.get("topology.acker.executors");
+        if (ackObj != null && ackObj.equals(0)) {
+            this.ackEnabled = false;
+        }
+        // for tests, reader will not be null
+        if (this.reader == null) {
+            this.reader = new FileReader(this.file);
+        }
+    }
+
+    @Override
+    public void nextTuple() {
+        if (ackEnabled) {
+            collector.emit(new Values(reader.nextLine()), count);
+            count++;
+        } else {
+            collector.emit(new Values(reader.nextLine()));
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(FIELDS));
+    }
+
+    public static List<String> readLines(InputStream input) {
+        List<String> lines = new ArrayList<>();
+        try {
+            BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+            try {
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    lines.add(line);
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Reading file failed", e);
+            } finally {
+                reader.close();
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Error closing reader", e);
+        }
+        return lines;
+    }
+
+    public static class FileReader implements Serializable {
+
+        private static final long serialVersionUID = -7012334600647556267L;
+
+        public final String file;
+        private List<String> contents = null;
+        private int index = 0;
+        private int limit = 0;
+
+        public FileReader(String file) {
+            this.file = file;
+            if (this.file != null) {
+                try {
+                    this.contents = readLines(new FileInputStream(this.file));
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    throw new IllegalArgumentException("Cannot open file " + file, e);
+                }
+                this.limit = contents.size();
+            } else {
+                throw new IllegalArgumentException("file name cannot be null");
+            }
+        }
+
+        public String nextLine() {
+            if (index >= limit) {
+                index = 0;
+            }
+            String line = contents.get(index);
+            index++;
+            return line;
+        }
+
+    }
+}