You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/09/29 05:20:12 UTC

[11/15] storm git commit: add external module docs; misc tweaks

http://git-wip-us.apache.org/repos/asf/storm/blob/af6d8ffc/documentation/flux.md
----------------------------------------------------------------------
diff --git a/documentation/flux.md b/documentation/flux.md
new file mode 100644
index 0000000..6f678d5
--- /dev/null
+++ b/documentation/flux.md
@@ -0,0 +1,835 @@
+---
+title: Flux
+layout: documentation
+documentation: true
+---
+
+A framework for creating and deploying Apache Storm streaming computations with less friction.
+
+## Definition
+**flux** |fləks| _noun_
+
+1. The action or process of flowing or flowing out
+2. Continuous change
+3. In physics, the rate of flow of a fluid, radiant energy, or particles across a given area
+4. A substance mixed with a solid to lower its melting point
+
+## Rationale
+Bad things happen when configuration is hard-coded. No one should have to recompile or repackage an application in
+order to change configuration.
+
+## About
+Flux is a framework and set of utilities that make defining and deploying Apache Storm topologies less painful and
+deveoper-intensive.
+
+Have you ever found yourself repeating this pattern?:
+
+```java
+
+public static void main(String[] args) throws Exception {
+    // logic to determine if we're running locally or not...
+    // create necessary config options...
+    boolean runLocal = shouldRunLocal();
+    if(runLocal){
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(name, conf, topology);
+    } else {
+        StormSubmitter.submitTopology(name, conf, topology);
+    }
+}
+```
+
+Wouldn't something like this be easier:
+
+```bash
+storm jar mytopology.jar org.apache.storm.flux.Flux --local config.yaml
+```
+
+or:
+
+```bash
+storm jar mytopology.jar org.apache.storm.flux.Flux --remote config.yaml
+```
+
+Another pain point often mentioned is the fact that the wiring for a Topology graph is often tied up in Java code,
+and that any changes require recompilation and repackaging of the topology jar file. Flux aims to alleviate that
+pain by allowing you to package all your Storm components in a single jar, and use an external text file to define
+the layout and configuration of your topologies.
+
+## Features
+
+ * Easily configure and deploy Storm topologies (Both Storm core and Microbatch API) without embedding configuration
+   in your topology code
+ * Support for existing topology code (see below)
+ * Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL
+ * YAML DSL support for most Storm components (storm-kafka, storm-hdfs, storm-hbase, etc.)
+ * Convenient support for multi-lang components
+ * External property substitution/filtering for easily switching between configurations/environments (similar to Maven-style
+   `${variable.name}` substitution)
+
+## Usage
+
+To use Flux, add it as a dependency and package all your Storm components in a fat jar, then create a YAML document
+to define your topology (see below for YAML configuration options).
+
+### Building from Source
+The easiest way to use Flux, is to add it as a Maven dependency in you project as described below.
+
+If you would like to build Flux from source and run the unit/integration tests, you will need the following installed
+on your system:
+
+* Python 2.6.x or later
+* Node.js 0.10.x or later
+
+#### Building with unit tests enabled:
+
+```
+mvn clean install
+```
+
+#### Building with unit tests disabled:
+If you would like to build Flux without installing Python or Node.js you can simply skip the unit tests:
+
+```
+mvn clean install -DskipTests=true
+```
+
+Note that if you plan on using Flux to deploy topologies to a remote cluster, you will still need to have Python
+installed since it is required by Apache Storm.
+
+
+#### Building with integration tests enabled:
+
+```
+mvn clean install -DskipIntegration=false
+```
+
+
+### Packaging with Maven
+To enable Flux for your Storm components, you need to add it as a dependency such that it's included in the Storm
+topology jar. This can be accomplished with the Maven shade plugin (preferred) or the Maven assembly plugin (not
+recommended).
+
+#### Flux Maven Dependency
+The current version of Flux is available in Maven Central at the following coordinates:
+```xml
+<dependency>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>flux-core</artifactId>
+    <version>${storm.version}</version>
+</dependency>
+```
+
+#### Creating a Flux-Enabled Topology JAR
+The example below illustrates Flux usage with the Maven shade plugin:
+
+ ```xml
+<!-- include Flux and user dependencies in the shaded jar -->
+<dependencies>
+    <!-- Flux include -->
+    <dependency>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>flux-core</artifactId>
+        <version>${storm.version}</version>
+    </dependency>
+
+    <!-- add user dependencies here... -->
+
+</dependencies>
+<!-- create a fat jar that includes all dependencies -->
+<build>
+    <plugins>
+        <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+            <version>1.4</version>
+            <configuration>
+                <createDependencyReducedPom>true</createDependencyReducedPom>
+            </configuration>
+            <executions>
+                <execution>
+                    <phase>package</phase>
+                    <goals>
+                        <goal>shade</goal>
+                    </goals>
+                    <configuration>
+                        <transformers>
+                            <transformer
+                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                            <transformer
+                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                <mainClass>org.apache.storm.flux.Flux</mainClass>
+                            </transformer>
+                        </transformers>
+                    </configuration>
+                </execution>
+            </executions>
+        </plugin>
+    </plugins>
+</build>
+ ```
+
+### Deploying and Running a Flux Topology
+Once your topology components are packaged with the Flux dependency, you can run different topologies either locally
+or remotely using the `storm jar` command. For example, if your fat jar is named `myTopology-0.1.0-SNAPSHOT.jar` you
+could run it locally with the command:
+
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml
+
+```
+
+### Command line options
+```
+usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux
+             [options] <topology-config.yaml>
+ -d,--dry-run                 Do not run or deploy the topology. Just
+                              build, validate, and print information about
+                              the topology.
+ -e,--env-filter              Perform environment variable substitution.
+                              Replace keys identified with `${ENV-[NAME]}`
+                              will be replaced with the corresponding
+                              `NAME` environment value
+ -f,--filter <file>           Perform property substitution. Use the
+                              specified file as a source of properties,
+                              and replace keys identified with {$[property
+                              name]} with the value defined in the
+                              properties file.
+ -i,--inactive                Deploy the topology, but do not activate it.
+ -l,--local                   Run the topology in local mode.
+ -n,--no-splash               Suppress the printing of the splash screen.
+ -q,--no-detail               Suppress the printing of topology details.
+ -r,--remote                  Deploy the topology to a remote cluster.
+ -R,--resource                Treat the supplied path as a classpath
+                              resource instead of a file.
+ -s,--sleep <ms>              When running locally, the amount of time to
+                              sleep (in ms.) before killing the topology
+                              and shutting down the local cluster.
+ -z,--zookeeper <host:port>   When running in local mode, use the
+                              ZooKeeper at the specified <host>:<port>
+                              instead of the in-process ZooKeeper.
+                              (requires Storm 0.9.3 or later)
+```
+
+**NOTE:** Flux tries to avoid command line switch collision with the `storm` command, and allows any other command line
+switches to pass through to the `storm` command.
+
+For example, you can use the `storm` command switch `-c` to override a topology configuration property. The following
+example command will run Flux and override the `nimus.host` configuration:
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote my_config.yaml -c nimbus.host=localhost
+```
+
+### Sample output
+```
+███████╗██╗     ██╗   ██╗██╗  ██╗
+██╔════╝██║     ██║   ██║╚██╗██╔╝
+█████╗  ██║     ██║   ██║ ╚███╔╝
+██╔══╝  ██║     ██║   ██║ ██╔██╗
+██║     ███████╗╚██████╔╝██╔╝ ██╗
+╚═╝     ╚══════╝ ╚═════╝ ╚═╝  ╚═╝
++-         Apache Storm        -+
++-  data FLow User eXperience  -+
+Version: 0.3.0
+Parsing file: /Users/hsimpson/Projects/donut_domination/storm/shell_test.yaml
+---------- TOPOLOGY DETAILS ----------
+Name: shell-topology
+--------------- SPOUTS ---------------
+sentence-spout[1](org.apache.storm.flux.spouts.GenericShellSpout)
+---------------- BOLTS ---------------
+splitsentence[1](org.apache.storm.flux.bolts.GenericShellBolt)
+log[1](org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
+count[1](backtype.storm.testing.TestWordCounter)
+--------------- STREAMS ---------------
+sentence-spout --SHUFFLE--> splitsentence
+splitsentence --FIELDS--> count
+count --SHUFFLE--> log
+--------------------------------------
+Submitting topology: 'shell-topology' to remote cluster...
+```
+
+## YAML Configuration
+Flux topologies are defined in a YAML file that describes a topology. A Flux topology
+definition consists of the following:
+
+  1. A topology name
+  2. A list of topology "components" (named Java objects that will be made available in the environment)
+  3. **EITHER** (A DSL topology definition):
+      * A list of spouts, each identified by a unique ID
+      * A list of bolts, each identified by a unique ID
+      * A list of "stream" objects representing a flow of tuples between spouts and bolts
+  4. **OR** (A JVM class that can produce a `backtype.storm.generated.StormTopology` instance:
+      * A `topologySource` definition.
+
+
+
+For example, here is a simple definition of a wordcount topology using the YAML DSL:
+
+```yaml
+name: "yaml-topology"
+config:
+  topology.workers: 1
+
+# spout definitions
+spouts:
+  - id: "spout-1"
+    className: "backtype.storm.testing.TestWordSpout"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+  - id: "bolt-1"
+    className: "backtype.storm.testing.TestWordCounter"
+    parallelism: 1
+  - id: "bolt-2"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+#stream definitions
+streams:
+  - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
+    from: "spout-1"
+    to: "bolt-1"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - name: "bolt-1 --> bolt2"
+    from: "bolt-1"
+    to: "bolt-2"
+    grouping:
+      type: SHUFFLE
+
+
+```
+## Property Substitution/Filtering
+It's common for developers to want to easily switch between configurations, for example switching deployment between
+a development environment and a production environment. This can be accomplished by using separate YAML configuration
+files, but that approach would lead to unnecessary duplication, especially in situations where the Storm topology
+does not change, but configuration settings such as host names, ports, and parallelism paramters do.
+
+For this case, Flux offers properties filtering to allow you two externalize values to a `.properties` file and have
+them substituted before the `.yaml` file is parsed.
+
+To enable property filtering, use the `--filter` command line option and specify a `.properties` file. For example,
+if you invoked flux like so:
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml --filter dev.properties
+```
+With the following `dev.properties` file:
+
+```properties
+kafka.zookeeper.hosts: localhost:2181
+```
+
+You would then be able to reference those properties by key in your `.yaml` file using `${}` syntax:
+
+```yaml
+  - id: "zkHosts"
+    className: "storm.kafka.ZkHosts"
+    constructorArgs:
+      - "${kafka.zookeeper.hosts}"
+```
+
+In this case, Flux would replace `${kafka.zookeeper.hosts}` with `localhost:2181` before parsing the YAML contents.
+
+### Environment Variable Substitution/Filtering
+Flux also allows environment variable substitution. For example, if an environment variable named `ZK_HOSTS` if defined,
+you can reference it in a Flux YAML file with the following syntax:
+
+```
+${ENV-ZK_HOSTS}
+```
+
+## Components
+Components are essentially named object instances that are made available as configuration options for spouts and
+bolts. If you are familiar with the Spring framework, components are roughly analagous to Spring beans.
+
+Every component is identified, at a minimum, by a unique identifier (String) and a class name (String). For example,
+the following will make an instance of the `storm.kafka.StringScheme` class available as a reference under the key
+`"stringScheme"` . This assumes the `storm.kafka.StringScheme` has a default constructor.
+
+```yaml
+components:
+  - id: "stringScheme"
+    className: "storm.kafka.StringScheme"
+```
+
+### Contructor Arguments, References, Properties and Configuration Methods
+
+####Constructor Arguments
+Arguments to a class constructor can be configured by adding a `contructorArgs` element to a components.
+`constructorArgs` is a list of objects that will be passed to the class' constructor. The following example creates an
+object by calling the constructor that takes a single string as an argument:
+
+```yaml
+  - id: "zkHosts"
+    className: "storm.kafka.ZkHosts"
+    constructorArgs:
+      - "localhost:2181"
+```
+
+####References
+Each component instance is identified by a unique id that allows it to be used/reused by other components. To
+reference an existing component, you specify the id of the component with the `ref` tag.
+
+In the following example, a component with the id `"stringScheme"` is created, and later referenced, as a an argument
+to another component's constructor:
+
+```yaml
+components:
+  - id: "stringScheme"
+    className: "storm.kafka.StringScheme"
+
+  - id: "stringMultiScheme"
+    className: "backtype.storm.spout.SchemeAsMultiScheme"
+    constructorArgs:
+      - ref: "stringScheme" # component with id "stringScheme" must be declared above.
+```
+**N.B.:** References can only be used after (below) the object they point to has been declared.
+
+####Properties
+In addition to calling constructors with different arguments, Flux also allows you to configure components using
+JavaBean-like setter methods and fields declared as `public`:
+
+```yaml
+  - id: "spoutConfig"
+    className: "storm.kafka.SpoutConfig"
+    constructorArgs:
+      # brokerHosts
+      - ref: "zkHosts"
+      # topic
+      - "myKafkaTopic"
+      # zkRoot
+      - "/kafkaSpout"
+      # id
+      - "myId"
+    properties:
+      - name: "forceFromStart"
+        value: true
+      - name: "scheme"
+        ref: "stringMultiScheme"
+```
+
+In the example above, the `properties` declaration will cause Flux to look for a public method in the `SpoutConfig` with
+the signature `setForceFromStart(boolean b)` and attempt to invoke it. If a setter method is not found, Flux will then
+look for a public instance variable with the name `forceFromStart` and attempt to set its value.
+
+References may also be used as property values.
+
+####Configuration Methods
+Conceptually, configuration methods are similar to Properties and Constructor Args -- they allow you to invoke an
+arbitrary method on an object after it is constructed. Configuration methods are useful for working with classes that
+don't expose JavaBean methods or have constructors that can fully configure the object. Common examples include classes
+that use the builder pattern for configuration/composition.
+
+The following YAML example creates a bolt and configures it by calling several methods:
+
+```yaml
+bolts:
+  - id: "bolt-1"
+    className: "org.apache.storm.flux.test.TestBolt"
+    parallelism: 1
+    configMethods:
+      - name: "withFoo"
+        args:
+          - "foo"
+      - name: "withBar"
+        args:
+          - "bar"
+      - name: "withFooBar"
+        args:
+          - "foo"
+          - "bar"
+```
+
+The signatures of the corresponding methods are as follows:
+
+```java
+    public void withFoo(String foo);
+    public void withBar(String bar);
+    public void withFooBar(String foo, String bar);
+```
+
+Arguments passed to configuration methods work much the same way as constructor arguments, and support references as
+well.
+
+### Using Java `enum`s in Contructor Arguments, References, Properties and Configuration Methods
+You can easily use Java `enum` values as arguments in a Flux YAML file, simply by referencing the name of the `enum`.
+
+For example, [Storm's HDFS module]() includes the following `enum` definition (simplified for brevity):
+
+```java
+public static enum Units {
+    KB, MB, GB, TB
+}
+```
+
+And the `org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy` class has the following constructor:
+
+```java
+public FileSizeRotationPolicy(float count, Units units)
+
+```
+The following Flux `component` definition could be used to call the constructor:
+
+```yaml
+  - id: "rotationPolicy"
+    className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"
+    constructorArgs:
+      - 5.0
+      - MB
+```
+
+The above definition is functionally equivalent to the following Java code:
+
+```java
+// rotate files when they reach 5MB
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+```
+
+## Topology Config
+The `config` section is simply a map of Storm topology configuration parameters that will be passed to the
+`backtype.storm.StormSubmitter` as an instance of the `backtype.storm.Config` class:
+
+```yaml
+config:
+  topology.workers: 4
+  topology.max.spout.pending: 1000
+  topology.message.timeout.secs: 30
+```
+
+# Existing Topologies
+If you have existing Storm topologies, you can still use Flux to deploy/run/test them. This feature allows you to
+leverage Flux Constructor Arguments, References, Properties, and Topology Config declarations for existing topology
+classes.
+
+The easiest way to use an existing topology class is to define
+a `getTopology()` instance method with one of the following signatures:
+
+```java
+public StormTopology getTopology(Map<String, Object> config)
+```
+or:
+
+```java
+public StormTopology getTopology(Config config)
+```
+
+You could then use the following YAML to configure your topology:
+
+```yaml
+name: "existing-topology"
+topologySource:
+  className: "org.apache.storm.flux.test.SimpleTopology"
+```
+
+If the class you would like to use as a topology source has a different method name (i.e. not `getTopology`), you can
+override it:
+
+```yaml
+name: "existing-topology"
+topologySource:
+  className: "org.apache.storm.flux.test.SimpleTopology"
+  methodName: "getTopologyWithDifferentMethodName"
+```
+
+__N.B.:__ The specified method must accept a single argument of type `java.util.Map<String, Object>` or
+`backtype.storm.Config`, and return a `backtype.storm.generated.StormTopology` object.
+
+# YAML DSL
+## Spouts and Bolts
+Spout and Bolts are configured in their own respective section of the YAML configuration. Spout and Bolt definitions
+are extensions to the `component` definition that add a `parallelism` parameter that sets the parallelism  for a
+component when the topology is deployed.
+
+Because spout and bolt definitions extend `component` they support constructor arguments, references, and properties as
+well.
+
+Shell spout example:
+
+```yaml
+spouts:
+  - id: "sentence-spout"
+    className: "org.apache.storm.flux.spouts.GenericShellSpout"
+    # shell spout constructor takes 2 arguments: String[], String[]
+    constructorArgs:
+      # command line
+      - ["node", "randomsentence.js"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+```
+
+Kafka spout example:
+
+```yaml
+components:
+  - id: "stringScheme"
+    className: "storm.kafka.StringScheme"
+
+  - id: "stringMultiScheme"
+    className: "backtype.storm.spout.SchemeAsMultiScheme"
+    constructorArgs:
+      - ref: "stringScheme"
+
+  - id: "zkHosts"
+    className: "storm.kafka.ZkHosts"
+    constructorArgs:
+      - "localhost:2181"
+
+# Alternative kafka config
+#  - id: "kafkaConfig"
+#    className: "storm.kafka.KafkaConfig"
+#    constructorArgs:
+#      # brokerHosts
+#      - ref: "zkHosts"
+#      # topic
+#      - "myKafkaTopic"
+#      # clientId (optional)
+#      - "myKafkaClientId"
+
+  - id: "spoutConfig"
+    className: "storm.kafka.SpoutConfig"
+    constructorArgs:
+      # brokerHosts
+      - ref: "zkHosts"
+      # topic
+      - "myKafkaTopic"
+      # zkRoot
+      - "/kafkaSpout"
+      # id
+      - "myId"
+    properties:
+      - name: "forceFromStart"
+        value: true
+      - name: "scheme"
+        ref: "stringMultiScheme"
+
+config:
+  topology.workers: 1
+
+# spout definitions
+spouts:
+  - id: "kafka-spout"
+    className: "storm.kafka.KafkaSpout"
+    constructorArgs:
+      - ref: "spoutConfig"
+
+```
+
+Bolt Examples:
+
+```yaml
+# bolt definitions
+bolts:
+  - id: "splitsentence"
+    className: "org.apache.storm.flux.bolts.GenericShellBolt"
+    constructorArgs:
+      # command line
+      - ["python", "splitsentence.py"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+    # ...
+
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+    # ...
+
+  - id: "count"
+    className: "backtype.storm.testing.TestWordCounter"
+    parallelism: 1
+    # ...
+```
+## Streams and Stream Groupings
+Streams in Flux are represented as a list of connections (Graph edges, data flow, etc.) between the Spouts and Bolts in
+a topology, with an associated Grouping definition.
+
+A Stream definition has the following properties:
+
+**`name`:** A name for the connection (optional, currently unused)
+
+**`from`:** The `id` of a Spout or Bolt that is the source (publisher)
+
+**`to`:** The `id` of a Spout or Bolt that is the destination (subscriber)
+
+**`grouping`:** The stream grouping definition for the Stream
+
+A Grouping definition has the following properties:
+
+**`type`:** The type of grouping. One of `ALL`,`CUSTOM`,`DIRECT`,`SHUFFLE`,`LOCAL_OR_SHUFFLE`,`FIELDS`,`GLOBAL`, or `NONE`.
+
+**`streamId`:** The Storm stream ID (Optional. If unspecified will use the default stream)
+
+**`args`:** For the `FIELDS` grouping, a list of field names.
+
+**`customClass`** For the `CUSTOM` grouping, a definition of custom grouping class instance
+
+The `streams` definition example below sets up a topology with the following wiring:
+
+```
+    kafka-spout --> splitsentence --> count --> log
+```
+
+
+```yaml
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+# custom stream groupings are also supported
+
+streams:
+  - name: "kafka --> split" # name isn't used (placeholder for logging, UI, etc.)
+    from: "kafka-spout"
+    to: "splitsentence"
+    grouping:
+      type: SHUFFLE
+
+  - name: "split --> count"
+    from: "splitsentence"
+    to: "count"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - name: "count --> log"
+    from: "count"
+    to: "log"
+    grouping:
+      type: SHUFFLE
+```
+
+### Custom Stream Groupings
+Custom stream groupings are defined by setting the grouping type to `CUSTOM` and defining a `customClass` parameter
+that tells Flux how to instantiate the custom class. The `customClass` definition extends `component`, so it supports
+constructor arguments, references, and properties as well.
+
+The example below creates a Stream with an instance of the `backtype.storm.testing.NGrouping` custom stream grouping
+class.
+
+```yaml
+  - name: "bolt-1 --> bolt2"
+    from: "bolt-1"
+    to: "bolt-2"
+    grouping:
+      type: CUSTOM
+      customClass:
+        className: "backtype.storm.testing.NGrouping"
+        constructorArgs:
+          - 1
+```
+
+## Includes and Overrides
+Flux allows you to include the contents of other YAML files, and have them treated as though they were defined in the
+same file. Includes may be either files, or classpath resources.
+
+Includes are specified as a list of maps:
+
+```yaml
+includes:
+  - resource: false
+    file: "src/test/resources/configs/shell_test.yaml"
+    override: false
+```
+
+If the `resource` property is set to `true`, the include will be loaded as a classpath resource from the value of the
+`file` attribute, otherwise it will be treated as a regular file.
+
+The `override` property controls how includes affect the values defined in the current file. If `override` is set to
+`true`, values in the included file will replace values in the current file being parsed. If `override` is set to
+`false`, values in the current file being parsed will take precedence, and the parser will refuse to replace them.
+
+**N.B.:** Includes are not yet recursive. Includes from included files will be ignored.
+
+
+## Basic Word Count Example
+
+This example uses a spout implemented in JavaScript, a bolt implemented in Python, and a bolt implemented in Java
+
+Topology YAML config:
+
+```yaml
+---
+name: "shell-topology"
+config:
+  topology.workers: 1
+
+# spout definitions
+spouts:
+  - id: "sentence-spout"
+    className: "org.apache.storm.flux.spouts.GenericShellSpout"
+    # shell spout constructor takes 2 arguments: String[], String[]
+    constructorArgs:
+      # command line
+      - ["node", "randomsentence.js"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+
+# bolt definitions
+bolts:
+  - id: "splitsentence"
+    className: "org.apache.storm.flux.bolts.GenericShellBolt"
+    constructorArgs:
+      # command line
+      - ["python", "splitsentence.py"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+  - id: "count"
+    className: "backtype.storm.testing.TestWordCounter"
+    parallelism: 1
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+# custom stream groupings are also supported
+
+streams:
+  - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.)
+    from: "sentence-spout"
+    to: "splitsentence"
+    grouping:
+      type: SHUFFLE
+
+  - name: "split --> count"
+    from: "splitsentence"
+    to: "count"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - name: "count --> log"
+    from: "count"
+    to: "log"
+    grouping:
+      type: SHUFFLE
+```
+
+
+## Micro-Batching (Trident) API Support
+Currenty, the Flux YAML DSL only supports the Core Storm API, but support for Storm's micro-batching API is planned.
+
+To use Flux with a Trident topology, define a topology getter method and reference it in your YAML config:
+
+```yaml
+name: "my-trident-topology"
+
+config:
+  topology.workers: 1
+
+topologySource:
+  className: "org.apache.storm.flux.test.TridentTopologySource"
+  # Flux will look for "getTopology", this will override that.
+  methodName: "getTopologyWithDifferentMethodName"
+```

http://git-wip-us.apache.org/repos/asf/storm/blob/af6d8ffc/documentation/storm-eventhubs.md
----------------------------------------------------------------------
diff --git a/documentation/storm-eventhubs.md b/documentation/storm-eventhubs.md
new file mode 100644
index 0000000..4af8c43
--- /dev/null
+++ b/documentation/storm-eventhubs.md
@@ -0,0 +1,40 @@
+---
+title: Azue Event Hubs Integration
+layout: documentation
+documentation: true
+---
+
+Storm spout and bolt implementation for Microsoft Azure Eventhubs
+
+### build ###
+	mvn clean package
+
+### run sample topology ###
+To run the sample topology, you need to modify the config.properties file with
+the eventhubs configurations. Here is an example:
+
+	eventhubspout.username = [username: policy name in EventHubs Portal]
+	eventhubspout.password = [password: shared access key in EventHubs Portal]
+	eventhubspout.namespace = [namespace]
+	eventhubspout.entitypath = [entitypath]
+	eventhubspout.partitions.count = [partitioncount]
+
+	# if not provided, will use storm's zookeeper settings
+	# zookeeper.connectionstring=zookeeper0:2181,zookeeper1:2181,zookeeper2:2181
+
+	eventhubspout.checkpoint.interval = 10
+	eventhub.receiver.credits = 1024
+
+Then you can use storm.cmd to submit the sample topology:
+	storm jar {jarfile} com.microsoft.eventhubs.samples.EventCount {topologyname} {spoutconffile}
+	where the {jarfile} should be: eventhubs-storm-spout-{version}-jar-with-dependencies.jar
+
+### Run EventHubSendClient ###
+We have included a simple EventHubs send client for testing purpose. You can run the client like this:
+	java -cp .\target\eventhubs-storm-spout-{version}-jar-with-dependencies.jar com.microsoft.eventhubs.client.EventHubSendClient
+ 	[username] [password] [entityPath] [partitionId] [messageSize] [messageCount]
+If you want to send messages to all partitions, use "-1" as partitionId.
+
+### Windows Azure Eventhubs ###
+	http://azure.microsoft.com/en-us/services/event-hubs/
+

http://git-wip-us.apache.org/repos/asf/storm/blob/af6d8ffc/documentation/storm-hbase.md
----------------------------------------------------------------------
diff --git a/documentation/storm-hbase.md b/documentation/storm-hbase.md
new file mode 100644
index 0000000..7f4fb62
--- /dev/null
+++ b/documentation/storm-hbase.md
@@ -0,0 +1,241 @@
+---
+title: Storm HBase Integration
+layout: documentation
+documentation: true
+---
+
+Storm/Trident integration for [Apache HBase](https://hbase.apache.org)
+
+## Usage
+The main API for interacting with HBase is the `org.apache.storm.hbase.bolt.mapper.HBaseMapper`
+interface:
+
+```java
+public interface HBaseMapper extends Serializable {
+    byte[] rowKey(Tuple tuple);
+
+    ColumnList columns(Tuple tuple);
+}
+```
+
+The `rowKey()` method is straightforward: given a Storm tuple, return a byte array representing the
+row key.
+
+The `columns()` method defines what will be written to an HBase row. The `ColumnList` class allows you
+to add both standard HBase columns as well as HBase counter columns.
+
+To add a standard column, use one of the `addColumn()` methods:
+
+```java
+ColumnList cols = new ColumnList();
+cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));
+```
+
+To add a counter column, use one of the `addCounter()` methods:
+
+```java
+ColumnList cols = new ColumnList();
+cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));
+```
+
+When the remote HBase is security enabled, a kerberos keytab and the corresponding principal name need to be
+provided for the storm-hbase connector. Specifically, the Config object passed into the topology should contain
+{(“storm.keytab.file”, “$keytab”), ("storm.kerberos.principal", “$principal”)}. Example:
+
+```java
+Config config = new Config();
+...
+config.put("storm.keytab.file", "$keytab");
+config.put("storm.kerberos.principal", "$principle");
+StormSubmitter.submitTopology("$topologyName", config, builder.createTopology());
+```
+
+##Working with Secure HBASE using delegation tokens.
+If your topology is going to interact with secure HBase, your bolts/states needs to be authenticated by HBase. 
+The approach described above requires that all potential worker hosts have "storm.keytab.file" on them. If you have 
+multiple topologies on a cluster , each with different hbase user, you will have to create multiple keytabs and distribute
+it to all workers. Instead of doing that you could use the following approach:
+
+Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
+The nimbus need to start with following configurations:
+
+nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
+nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
+hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.)
+hbase.kerberos.principal: "superuser@EXAMPLE.com"
+nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed, 
+if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is 
+atleast 1 hour less then that.)
+
+Your topology configuration should have:
+topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] 
+
+If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hbase configuration 
+files(core-site.xml,hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath. 
+Nimbus will use the keytab and principal specified in the config to authenticate with HBase. From then on for every
+topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
+topology submitter user. If topology was started with topology.auto-credentials set to AutoHBase, nimbus will push the
+delegation tokens to all the workers for your topology and the hbase bolt/state will authenticate with these tokens.
+
+As nimbus is impersonating topology submitter user, you need to ensure the user specified in storm.kerberos.principal 
+has permissions to acquire tokens on behalf of other users. To achieve this you need to follow configuration directions 
+listed on this link
+
+http://hbase.apache.org/book/security.html#security.rest.gateway
+
+You can read about setting up secure HBase here:http://hbase.apache.org/book/security.html.
+
+### SimpleHBaseMapper
+`storm-hbase` includes a general purpose `HBaseMapper` implementation called `SimpleHBaseMapper` that can map Storm
+tuples to both regular HBase columns as well as counter columns.
+
+To use `SimpleHBaseMapper`, you simply tell it which fields to map to which types of columns.
+
+The following code create a `SimpleHBaseMapper` instance that:
+
+1. Uses the `word` tuple value as a row key.
+2. Adds a standard HBase column for the tuple field `word`.
+3. Adds an HBase counter column for the tuple field `count`.
+4. Writes values to the `cf` column family.
+
+```java
+SimpleHBaseMapper mapper = new SimpleHBaseMapper() 
+        .withRowKeyField("word")
+        .withColumnFields(new Fields("word"))
+        .withCounterFields(new Fields("count"))
+        .withColumnFamily("cf");
+```
+### HBaseBolt
+To use the `HBaseBolt`, construct it with the name of the table to write to, an a `HBaseMapper` implementation:
+
+ ```java
+HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
+ ```
+
+The `HBaseBolt` will delegate to the `mapper` instance to figure out how to persist tuple data to HBase.
+
+###HBaseValueMapper
+This class allows you to transform the HBase lookup result into storm Values that will be emitted by the `HBaseLookupBolt`.
+
+```java
+public interface HBaseValueMapper extends Serializable {
+    public List<Values> toTuples(Result result) throws Exception;
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+The `toTuples` method takes in a HBase `Result` instance and expects a List of `Values` instant. 
+Each of the value returned by this function will be emitted by the `HBaseLookupBolt`.
+
+The `declareOutputFields` should be used to declare the outputFields of the `HBaseLookupBolt`.
+
+There is an example implementation in `src/test/java` directory.
+
+###HBaseProjectionCriteria
+This class allows you to specify the projection criteria for your HBase Get function. This is optional parameter
+for the lookupBolt and if you do not specify this instance all the columns will be returned by `HBaseLookupBolt`.
+
+```java
+public class HBaseProjectionCriteria implements Serializable {
+    public HBaseProjectionCriteria addColumnFamily(String columnFamily);
+    public HBaseProjectionCriteria addColumn(ColumnMetaData column);
+```    
+`addColumnFamily` takes in columnFamily. Setting this parameter means all columns for this family will be included
+ in the projection.
+ 
+`addColumn` takes in a columnMetaData instance. Setting this parameter means only this column from the column familty 
+ will be part of your projection.
+The following code creates a projectionCriteria which specifies a projection criteria that:
+
+1. includes count column from column family cf.
+2. includes all columns from column family cf2.
+
+```java
+HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria()
+    .addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count"))
+    .addColumnFamily("cf2");
+```
+
+###HBaseLookupBolt
+To use the `HBaseLookupBolt`, Construct it with the name of the table to write to, an implementation of `HBaseMapper` 
+and an implementation of `HBaseRowToStormValueMapper`. You can optionally specify a `HBaseProjectionCriteria`. 
+
+The `HBaseLookupBolt` will use the mapper to get rowKey to lookup for. It will use the `HBaseProjectionCriteria` to 
+figure out which columns to include in the result and it will leverage the `HBaseRowToStormValueMapper` to get the 
+values to be emitted by the bolt.
+
+You can look at an example topology LookupWordCount.java under `src/test/java`.
+## Example: Persistent Word Count
+A runnable example can be found in the `src/test/java` directory.
+
+### Setup
+The following steps assume you are running HBase locally, or there is an `hbase-site.xml` on the
+classpath pointing to your HBase cluster.
+
+Use the `hbase shell` command to create the schema:
+
+```
+> create 'WordCount', 'cf'
+```
+
+### Execution
+Run the `org.apache.storm.hbase.topology.PersistenWordCount` class (it will run the topology for 10 seconds, then exit).
+
+After (or while) the word count topology is running, run the `org.apache.storm.hbase.topology.WordCountClient` class
+to view the counter values stored in HBase. You should see something like to following:
+
+```
+Word: 'apple', Count: 6867
+Word: 'orange', Count: 6645
+Word: 'pineapple', Count: 6954
+Word: 'banana', Count: 6787
+Word: 'watermelon', Count: 6806
+```
+
+For reference, the sample topology is listed below:
+
+```java
+public class PersistentWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String COUNT_BOLT = "COUNT_BOLT";
+    private static final String HBASE_BOLT = "HBASE_BOLT";
+
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        WordSpout spout = new WordSpout();
+        WordCounter bolt = new WordCounter();
+
+        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
+                .withRowKeyField("word")
+                .withColumnFields(new Fields("word"))
+                .withCounterFields(new Fields("count"))
+                .withColumnFamily("cf");
+
+        HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
+
+
+        // wordSpout ==> countBolt ==> HBaseBolt
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+
+        if (args.length == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(10000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else {
+            config.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
+        }
+    }
+}
+```
+

http://git-wip-us.apache.org/repos/asf/storm/blob/af6d8ffc/documentation/storm-hdfs.md
----------------------------------------------------------------------
diff --git a/documentation/storm-hdfs.md b/documentation/storm-hdfs.md
new file mode 100644
index 0000000..b5bf64d
--- /dev/null
+++ b/documentation/storm-hdfs.md
@@ -0,0 +1,368 @@
+---
+title: Storm HDFS Integration
+layout: documentation
+documentation: true
+---
+
+Storm components for interacting with HDFS file systems
+
+
+## Usage
+The following example will write pipe("|")-delimited files to the HDFS path hdfs://localhost:54310/foo. After every
+1,000 tuples it will sync filesystem, making that data visible to other HDFS clients. It will rotate files when they
+reach 5 megabytes in size.
+
+```java
+// use "|" instead of "," for field delimiter
+RecordFormat format = new DelimitedRecordFormat()
+        .withFieldDelimiter("|");
+
+// sync the filesystem after every 1k tuples
+SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+// rotate files when they reach 5MB
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+
+FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+        .withPath("/foo/");
+
+HdfsBolt bolt = new HdfsBolt()
+        .withFsUrl("hdfs://localhost:54310")
+        .withFileNameFormat(fileNameFormat)
+        .withRecordFormat(format)
+        .withRotationPolicy(rotationPolicy)
+        .withSyncPolicy(syncPolicy);
+```
+
+### Packaging a Topology
+When packaging your topology, it's important that you use the [maven-shade-plugin]() as opposed to the
+[maven-assembly-plugin]().
+
+The shade plugin provides facilities for merging JAR manifest entries, which the hadoop client leverages for URL scheme
+resolution.
+
+If you experience errors such as the following:
+
+```
+java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs
+```
+
+it's an indication that your topology jar file isn't packaged properly.
+
+If you are using maven to create your topology jar, you should use the following `maven-shade-plugin` configuration to
+create your topology jar:
+
+```xml
+<plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+    <artifactId>maven-shade-plugin</artifactId>
+    <version>1.4</version>
+    <configuration>
+        <createDependencyReducedPom>true</createDependencyReducedPom>
+    </configuration>
+    <executions>
+        <execution>
+            <phase>package</phase>
+            <goals>
+                <goal>shade</goal>
+            </goals>
+            <configuration>
+                <transformers>
+                    <transformer
+                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                    <transformer
+                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                        <mainClass></mainClass>
+                    </transformer>
+                </transformers>
+            </configuration>
+        </execution>
+    </executions>
+</plugin>
+
+```
+
+### Specifying a Hadoop Version
+By default, storm-hdfs uses the following Hadoop dependencies:
+
+```xml
+<dependency>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-client</artifactId>
+    <version>2.2.0</version>
+    <exclusions>
+        <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+<dependency>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-hdfs</artifactId>
+    <version>2.2.0</version>
+    <exclusions>
+        <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
+If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the storm-hdfs dependency
+and add the dependencies for your preferred version in your pom.
+
+Hadoop client version incompatibilites can manifest as errors like:
+
+```
+com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)
+```
+
+## Customization
+
+### Record Formats
+Record format can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.RecordFormat`
+interface:
+
+```java
+public interface RecordFormat extends Serializable {
+    byte[] format(Tuple tuple);
+}
+```
+
+The provided `org.apache.storm.hdfs.format.DelimitedRecordFormat` is capable of producing formats such as CSV and
+tab-delimited files.
+
+
+### File Naming
+File naming can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.FileNameFormat`
+interface:
+
+```java
+public interface FileNameFormat extends Serializable {
+    void prepare(Map conf, TopologyContext topologyContext);
+    String getName(long rotation, long timeStamp);
+    String getPath();
+}
+```
+
+The provided `org.apache.storm.hdfs.format.DefaultFileNameFormat`  will create file names with the following format:
+
+     {prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
+
+For example:
+
+     MyBolt-5-7-1390579837830.txt
+
+By default, prefix is empty and extenstion is ".txt".
+
+
+
+### Sync Policies
+Sync policies allow you to control when buffered data is flushed to the underlying filesystem (thus making it available
+to clients reading the data) by implementing the `org.apache.storm.hdfs.sync.SyncPolicy` interface:
+
+```java
+public interface SyncPolicy extends Serializable {
+    boolean mark(Tuple tuple, long offset);
+    void reset();
+}
+```
+The `HdfsBolt` will call the `mark()` method for every tuple it processes. Returning `true` will trigger the `HdfsBolt`
+to perform a sync/flush, after which it will call the `reset()` method.
+
+The `org.apache.storm.hdfs.sync.CountSyncPolicy` class simply triggers a sync after the specified number of tuples have
+been processed.
+
+### File Rotation Policies
+Similar to sync policies, file rotation policies allow you to control when data files are rotated by providing a
+`org.apache.storm.hdfs.rotation.FileRotation` interface:
+
+```java
+public interface FileRotationPolicy extends Serializable {
+    boolean mark(Tuple tuple, long offset);
+    void reset();
+}
+``` 
+
+The `org.apache.storm.hdfs.rotation.FileSizeRotationPolicy` implementation allows you to trigger file rotation when
+data files reach a specific file size:
+
+```java
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+```
+
+### File Rotation Actions
+Both the HDFS bolt and Trident State implementation allow you to register any number of `RotationAction`s.
+What `RotationAction`s do is provide a hook to allow you to perform some action right after a file is rotated. For
+example, moving a file to a different location or renaming it.
+
+
+```java
+public interface RotationAction extends Serializable {
+    void execute(FileSystem fileSystem, Path filePath) throws IOException;
+}
+```
+
+Storm-HDFS includes a simple action that will move a file after rotation:
+
+```java
+public class MoveFileAction implements RotationAction {
+    private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class);
+
+    private String destination;
+
+    public MoveFileAction withDestination(String destDir){
+        destination = destDir;
+        return this;
+    }
+
+    @Override
+    public void execute(FileSystem fileSystem, Path filePath) throws IOException {
+        Path destPath = new Path(destination, filePath.getName());
+        LOG.info("Moving file {} to {}", filePath, destPath);
+        boolean success = fileSystem.rename(filePath, destPath);
+        return;
+    }
+}
+```
+
+If you are using Trident and sequence files you can do something like this:
+
+```java
+        HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
+                .withFileNameFormat(fileNameFormat)
+                .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
+                .withRotationPolicy(rotationPolicy)
+                .withFsUrl("hdfs://localhost:54310")
+                .addRotationAction(new MoveFileAction().withDestination("/dest2/"));
+```
+
+
+## Support for HDFS Sequence Files
+
+The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to write storm data to HDFS sequence files:
+
+```java
+        // sync the filesystem after every 1k tuples
+        SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+        // rotate files when they reach 5MB
+        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+
+        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+                .withExtension(".seq")
+                .withPath("/data/");
+
+        // create sequence format instance.
+        DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence");
+
+        SequenceFileBolt bolt = new SequenceFileBolt()
+                .withFsUrl("hdfs://localhost:54310")
+                .withFileNameFormat(fileNameFormat)
+                .withSequenceFormat(format)
+                .withRotationPolicy(rotationPolicy)
+                .withSyncPolicy(syncPolicy)
+                .withCompressionType(SequenceFile.CompressionType.RECORD)
+                .withCompressionCodec("deflate");
+```
+
+The `SequenceFileBolt` requires that you provide a `org.apache.storm.hdfs.bolt.format.SequenceFormat` that maps tuples to
+key/value pairs:
+
+```java
+public interface SequenceFormat extends Serializable {
+    Class keyClass();
+    Class valueClass();
+
+    Writable key(Tuple tuple);
+    Writable value(Tuple tuple);
+}
+```
+
+## Trident API
+storm-hdfs also includes a Trident `state` implementation for writing data to HDFS, with an API that closely mirrors
+that of the bolts.
+
+ ```java
+         Fields hdfsFields = new Fields("field1", "field2");
+
+         FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+                 .withPath("/trident")
+                 .withPrefix("trident")
+                 .withExtension(".txt");
+
+         RecordFormat recordFormat = new DelimitedRecordFormat()
+                 .withFields(hdfsFields);
+
+         FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
+
+        HdfsState.Options options = new HdfsState.HdfsFileOptions()
+                .withFileNameFormat(fileNameFormat)
+                .withRecordFormat(recordFormat)
+                .withRotationPolicy(rotationPolicy)
+                .withFsUrl("hdfs://localhost:54310");
+
+         StateFactory factory = new HdfsStateFactory().withOptions(options);
+
+         TridentState state = stream
+                 .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());
+ ```
+
+ To use the sequence file `State` implementation, use the `HdfsState.SequenceFileOptions`:
+
+ ```java
+        HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
+                .withFileNameFormat(fileNameFormat)
+                .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
+                .withRotationPolicy(rotationPolicy)
+                .withFsUrl("hdfs://localhost:54310")
+                .addRotationAction(new MoveFileAction().toDestination("/dest2/"));
+```
+
+##Working with Secure HDFS
+If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We 
+currently have 2 options to support this:
+
+### Using HDFS delegation tokens 
+Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
+The nimbus need to start with following configurations:
+
+nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] 
+nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] 
+hdfs.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hdfs super user that can impersonate other users.)
+hdfs.kerberos.principal: "superuser@EXAMPLE.com" 
+nimbus.credential.renewers.freq.secs : 82800 (23 hours, hdfs tokens needs to be renewed every 24 hours so this value should be
+less then 24 hours.)
+topology.hdfs.uri:"hdfs://host:port" (This is an optional config, by default we will use value of "fs.defaultFS" property
+specified in hadoop's core-site.xml)
+
+Your topology configuration should have:
+topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"] 
+
+If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hadoop configuration 
+files(core-site.xml and hdfs-site.xml) and the storm-hdfs jar with all the dependencies is present in nimbus's classpath. 
+Nimbus will use the keytab and principal specified in the config to authenticate with Namenode. From then on for every
+topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
+topology submitter user. If topology was started with topology.auto-credentials set to AutoHDFS, nimbus will push the
+delegation tokens to all the workers for your topology and the hdfs bolt/state will authenticate with namenode using 
+these tokens.
+
+As nimbus is impersonating topology submitter user, you need to ensure the user specified in hdfs.kerberos.principal 
+has permissions to acquire tokens on behalf of other users. To achieve this you need to follow configuration directions 
+listed on this link
+http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html
+
+You can read about setting up secure HDFS here: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html.
+
+### Using keytabs on all worker hosts
+If you have distributed the keytab files for hdfs user on all potential worker hosts then you can use this method. You should specify a 
+hdfs config key using the method HdfsBolt/State.withconfigKey("somekey") and the value map of this key should have following 2 properties:
+
+hdfs.keytab.file: "/path/to/keytab/"
+hdfs.kerberos.principal: "user@EXAMPLE.com"
+
+On worker hosts the bolt/trident-state code will use the keytab file with principal provided in the config to authenticate with 
+Namenode. This method is little dangerous as you need to ensure all workers have the keytab file at the same location and you need
+to remember this as you bring up new hosts in the cluster.

http://git-wip-us.apache.org/repos/asf/storm/blob/af6d8ffc/documentation/storm-hive.md
----------------------------------------------------------------------
diff --git a/documentation/storm-hive.md b/documentation/storm-hive.md
new file mode 100644
index 0000000..e2dd657
--- /dev/null
+++ b/documentation/storm-hive.md
@@ -0,0 +1,111 @@
+---
+title: Storm Hive Integration
+layout: documentation
+documentation: true
+---
+
+  Hive offers streaming API that allows data to be written continuously into Hive. The incoming data 
+  can be continuously committed in small batches of records into existing Hive partition or table. Once the data
+  is committed its immediately visible to all hive queries. More info on Hive Streaming API 
+  https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest
+  
+  With the help of Hive Streaming API, HiveBolt and HiveState allows users to stream data from Storm into Hive directly.
+  To use Hive streaming API users need to create a bucketed table with ORC format.  Example below
+  
+  ```sql
+  create table test_table ( id INT, name STRING, phone STRING, street STRING) partitioned by (city STRING, state STRING) stored as orc tblproperties ("orc.compress"="NONE");
+  ```
+  
+
+## HiveBolt (org.apache.storm.hive.bolt.HiveBolt)
+
+HiveBolt streams tuples directly into Hive. Tuples are written using Hive Transactions. 
+Partitions to which HiveBolt will stream to can either created or pre-created or optionally
+HiveBolt can create them if they are missing. Fields from Tuples are mapped to table columns.
+User should make sure that Tuple field names are matched to the table column names.
+
+```java
+DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames));
+HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper);
+HiveBolt hiveBolt = new HiveBolt(hiveOptions);
+```
+
+### RecordHiveMapper
+   This class maps Tuple field names to Hive table column names.
+   There are two implementaitons available
+ 
+   
+   + DelimitedRecordHiveMapper (org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper)
+   + JsonRecordHiveMapper (org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper)
+   
+   ```java
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
+    or
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField("YYYY/MM/DD");
+   ```
+
+|Arg | Description | Type
+|--- |--- |---
+|withColumnFields| field names in a tuple to be mapped to table column names | Fields (required) |
+|withPartitionFields| field names in a tuple can be mapped to hive table partitions | Fields |
+|withTimeAsPartitionField| users can select system time as partition in hive table| String . Date format|
+
+### HiveOptions (org.apache.storm.hive.common.HiveOptions)
+  
+HiveBolt takes in HiveOptions as a constructor arg.
+
+  ```java
+  HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                                .withTxnsPerBatch(10)
+                				.withBatchSize(1000)
+                	     		.withIdleTimeout(10)
+  ```
+
+
+HiveOptions params
+
+|Arg  |Description | Type
+|---	|--- |---
+|metaStoreURI | hive meta store URI (can be found in hive-site.xml) | String (required) |
+|dbName | database name | String (required) |
+|tblName | table name | String (required) |
+|mapper| Mapper class to map Tuple field names to Table column names | DelimitedRecordHiveMapper or JsonRecordHiveMapper (required) |
+|withTxnsPerBatch | Hive grants a *batch of transactions* instead of single transactions to streaming clients like HiveBolt.This setting configures the number of desired transactions per Transaction Batch. Data from all transactions in a single batch end up in a single file. Flume will write a maximum of batchSize events in each transaction in the batch. This setting in conjunction with batchSize provides control over the size of each file. Note that eventually Hive will transparently compact these files into larger files.| Integer . default 100 |
+|withMaxOpenConnections| Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.| Integer . default 100|
+|withBatchSize| Max number of events written to Hive in a single Hive transaction| Integer. default 15000|
+|withCallTimeout| (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort. | Integer. default 10000|
+|withHeartBeatInterval| (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats.| Integer. default 240 |
+|withAutoCreatePartitions| HiveBolt will automatically create the necessary Hive partitions to stream to. |Boolean. default true |
+|withKerberosPrinicipal| Kerberos user principal for accessing secure Hive | String|
+|withKerberosKeytab| Kerberos keytab for accessing secure Hive | String |
+|withTickTupleInterval| (In seconds) If > 0 then the Hive Bolt will periodically flush transaction batches. Enabling this is recommended to avoid tuple timeouts while waiting for a batch to fill up.| Integer. default 0|
+
+
+ 
+## HiveState (org.apache.storm.hive.trident.HiveTrident)
+
+Hive Trident state also follows similar pattern to HiveBolt it takes in HiveOptions as an arg.
+
+```java
+   DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField("YYYY/MM/DD");
+            
+   HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                                .withTxnsPerBatch(10)
+                				.withBatchSize(1000)
+                	     		.withIdleTimeout(10)
+                	     		
+   StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
+   TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields());
+ ```
+   
+
+
+
+ 

http://git-wip-us.apache.org/repos/asf/storm/blob/af6d8ffc/documentation/storm-jdbc.md
----------------------------------------------------------------------
diff --git a/documentation/storm-jdbc.md b/documentation/storm-jdbc.md
new file mode 100644
index 0000000..15aa2a3
--- /dev/null
+++ b/documentation/storm-jdbc.md
@@ -0,0 +1,285 @@
+---
+title: Storm JDBC Integration
+layout: documentation
+documentation: true
+---
+
+Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
+to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples 
+in a storm topology.
+
+**Note**: Throughout the examples below, we make use of com.google.common.collect.Lists and com.google.common.collect.Maps.
+
+## Inserting into a database.
+The bolt and trident state included in this package for inserting data into a database tables are tied to a single table.
+
+### ConnectionProvider
+An interface that should be implemented by different connection pooling mechanism `org.apache.storm.jdbc.common.ConnectionProvider`
+
+```java
+public interface ConnectionProvider extends Serializable {
+    /**
+     * method must be idempotent.
+     */
+    void prepare();
+
+    /**
+     *
+     * @return a DB connection over which the queries can be executed.
+     */
+    Connection getConnection();
+
+    /**
+     * called once when the system is shutting down, should be idempotent.
+     */
+    void cleanup();
+}
+```
+
+Out of the box we support `org.apache.storm.jdbc.common.HikariCPConnectionProvider` which is an implementation that uses HikariCP.
+
+###JdbcMapper
+The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface:
+
+```java
+public interface JdbcMapper  extends Serializable {
+    List<Column> getColumns(ITuple tuple);
+}
+```
+
+The `getColumns()` method defines how a storm tuple maps to a list of columns representing a row in a database. 
+**The order of the returned list is important. The place holders in the supplied queries are resolved in the same order as returned list.**
+For example if the user supplied insert query is `insert into user(user_id, user_name, create_date) values (?,?, now())` the 1st item 
+of the returned list of `getColumns` method will map to the 1st place holder and the 2nd to the 2nd and so on. We do not parse
+the supplied queries to try and resolve place holder by column names. Not making any assumptions about the query syntax allows this connector
+to be used by some non-standard sql frameworks like Pheonix which only supports upsert into.
+
+### JdbcInsertBolt
+To use the `JdbcInsertBolt`, you construct an instance of it by specifying a `ConnectionProvider` implementation
+and a `JdbcMapper` implementation that converts storm tuple to DB row. In addition, you must either supply
+a table name  using `withTableName` method or an insert query using `withInsertQuery`. 
+If you specify a insert query you should ensure that your `JdbcMapper` implementation will return a list of columns in the same order as in your insert query.
+You can optionally specify a query timeout seconds param that specifies max seconds an insert query can take. 
+The default is set to value of topology.message.timeout.secs and a value of -1 will indicate not to set any query timeout.
+You should set the query timeout value to be <= topology.message.timeout.secs.
+
+ ```java
+Map hikariConfigMap = Maps.newHashMap();
+hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
+hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
+hikariConfigMap.put("dataSource.user","root");
+hikariConfigMap.put("dataSource.password","password");
+ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
+
+String tableName = "user_details";
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
+
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
+                                    .withTableName("user")
+                                    .withQueryTimeoutSecs(30);
+                                    Or
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
+                                    .withInsertQuery("insert into user values (?,?)")
+                                    .withQueryTimeoutSecs(30);                                    
+ ```
+
+### SimpleJdbcMapper
+`storm-jdbc` includes a general purpose `JdbcMapper` implementation called `SimpleJdbcMapper` that can map Storm
+tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in 
+the database table that you intend to write to.
+
+To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a connectionProvider instance.
+
+The following code creates a `SimpleJdbcMapper` instance that:
+
+1. Will allow the mapper to transform a storm tuple to a list of columns mapping to a row in table test.user_details.
+2. Will use the provided HikariCP configuration to establish a connection pool with specified Database configuration and
+automatically figure out the column names and corresponding data types of the table that you intend to write to. 
+Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to learn more about hikari configuration properties.
+
+```java
+Map hikariConfigMap = Maps.newHashMap();
+hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
+hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
+hikariConfigMap.put("dataSource.user","root");
+hikariConfigMap.put("dataSource.password","password");
+ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
+String tableName = "user_details";
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
+```
+The mapper initialized in the example above assumes a storm tuple has value for all the columns of the table you intend to insert data into and its `getColumn`
+method will return the columns in the order in which Jdbc connection instance's `connection.getMetaData().getColumns();` method returns them.
+
+**If you specified your own insert query to `JdbcInsertBolt` you must initialize `SimpleJdbcMapper` with explicit columnschema such that the schema has columns in the same order as your insert queries.**
+For example if your insert query is `Insert into user (user_id, user_name) values (?,?)` then your `SimpleJdbcMapper` should be initialized with the following statements:
+```java
+List<Column> columnSchema = Lists.newArrayList(
+    new Column("user_id", java.sql.Types.INTEGER),
+    new Column("user_name", java.sql.Types.VARCHAR));
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
+```
+
+If your storm tuple only has fields for a subset of columns i.e. if some of the columns in your table have default values and you want to only insert values for columns with no default values you can enforce the behavior by initializing the 
+`SimpleJdbcMapper` with explicit columnschema. For example, if you have a user_details table `create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);`
+In this table the create_time column has a default value. To ensure only the columns with no default values are inserted 
+you can initialize the `jdbcMapper` as below:
+
+```java
+List<Column> columnSchema = Lists.newArrayList(
+    new Column("user_id", java.sql.Types.INTEGER),
+    new Column("user_name", java.sql.Types.VARCHAR),
+    new Column("dept_name", java.sql.Types.VARCHAR));
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
+```
+### JdbcTridentState
+We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
+state you need to initialize it with the table name or an insert query, the JdbcMapper instance and connection provider instance.
+See the example below:
+
+```java
+JdbcState.Options options = new JdbcState.Options()
+        .withConnectionProvider(connectionProvider)
+        .withMapper(jdbcMapper)
+        .withTableName("user_details")
+        .withQueryTimeoutSecs(30);
+JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
+```
+similar to `JdbcInsertBolt` you can specify a custom insert query using `withInsertQuery` instead of specifying a table name.
+
+## Lookup from Database
+We support `select` queries from databases to allow enrichment of storm tuples in a topology. The main API for 
+executing select queries against a database using JDBC is the `org.apache.storm.jdbc.mapper.JdbcLookupMapper` interface:
+
+```java
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+    List<Column> getColumns(ITuple tuple);
+    List<Values> toTuple(ITuple input, List<Column> columns);
+```
+
+The `declareOutputFields` method is used to indicate what fields will be emitted as part of output tuple of processing a storm 
+tuple. 
+
+The `getColumns` method specifies the place holder columns in a select query and their SQL type and the value to use.
+For example in the user_details table mentioned above if you were executing a query `select user_name from user_details where
+user_id = ? and create_time > ?` the `getColumns` method would take a storm input tuple and return a List containing two items.
+The first instance of `Column` type's `getValue()` method will be used as the value of `user_id` to lookup for and the
+second instance of `Column` type's `getValue()` method will be used as the value of `create_time`.
+**Note: the order in the returned list determines the place holder's value. In other words the first item in the list maps 
+to first `?` in select query, the second item to second `?` in query and so on.** 
+
+The `toTuple` method takes in the input tuple and a list of columns representing a DB row as a result of the select query
+and returns a list of values to be emitted. 
+**Please note that it returns a list of `Values` and not just a single instance of `Values`.** 
+This allows a for a single DB row to be mapped to multiple output storm tuples.
+
+###SimpleJdbcLookupMapper
+`storm-jdbc` includes a general purpose `JdbcLookupMapper` implementation called `SimpleJdbcLookupMapper`. 
+
+To use `SimpleJdbcMapper`, you have to initialize it with the fields that will be outputted by your bolt and the list of
+columns that are used in your select query as place holder. The following example shows initialization of a `SimpleJdbcLookupMapper`
+that declares `user_id,user_name,create_date` as output fields and `user_id` as the place holder column in select query.
+SimpleJdbcMapper assumes the field name in your tuple is equal to the place holder column name, i.e. in our example 
+`SimpleJdbcMapper` will look for a field `use_id` in the input tuple and use its value as the place holder's value in the
+select query. For constructing output tuples, it looks for fields specified in `outputFields` in the input tuple first, 
+and if it is not found in input tuple then it looks at select query's output row for a column with same name as field name. 
+So in the example below if the input tuple had fields `user_id, create_date` and the select query was 
+`select user_name from user_details where user_id = ?`, For each input tuple `SimpleJdbcLookupMapper.getColumns(tuple)` 
+will return the value of `tuple.getValueByField("user_id")` which will be used as the value in `?` of select query. 
+For each output row from DB, `SimpleJdbcLookupMapper.toTuple()` will use the `user_id, create_date` from the input tuple as 
+is adding only `user_name` from the resulting row and returning these 3 fields as a single output tuple.
+
+```java
+Fields outputFields = new Fields("user_id", "user_name", "create_date");
+List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
+this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
+```
+
+### JdbcLookupBolt
+To use the `JdbcLookupBolt`, construct an instance of it using a `ConnectionProvider` instance, `JdbcLookupMapper` instance and the select query to execute.
+You can optionally specify a query timeout seconds param that specifies max seconds the select query can take. 
+The default is set to value of topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.
+
+```java
+String selectSql = "select user_name from user_details where user_id = ?";
+SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns)
+JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, lookupMapper)
+        .withQueryTimeoutSecs(30);
+```
+
+### JdbcTridentState for lookup
+We also support a trident query state that can be used with trident topologies. 
+
+```java
+JdbcState.Options options = new JdbcState.Options()
+        .withConnectionProvider(connectionProvider)
+        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
+        .withSelectQuery("select user_name from user_details where user_id = ?");
+        .withQueryTimeoutSecs(30);
+```
+
+## Example:
+A runnable example can be found in the `src/test/java/topology` directory.
+
+### Setup
+* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.
+* The test topologies executes the following queries so your intended DB must support these queries for test topologies
+to work. 
+```SQL
+create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
+create table if not exists department (dept_id integer, dept_name varchar(100));
+create table if not exists user_department (user_id integer, dept_id integer);
+insert into department values (1, 'R&D');
+insert into department values (2, 'Finance');
+insert into department values (3, 'HR');
+insert into department values (4, 'Sales');
+insert into user_department values (1, 1);
+insert into user_department values (2, 2);
+insert into user_department values (3, 3);
+insert into user_department values (4, 4);
+select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?;
+```
+### Execution
+Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using storm jar command. The class expects 5 args
+storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> [topology name]
+
+To make it work with Mysql, you can add the following to the pom.xml
+
+```
+<dependency>
+    <groupId>mysql</groupId>
+    <artifactId>mysql-connector-java</artifactId>
+    <version>5.1.31</version>
+</dependency>
+```
+
+You can generate a single jar with dependencies using mvn assembly plugin. To use the plugin add the following to your pom.xml and execute 
+`mvn clean compile assembly:single`
+
+```
+<plugin>
+    <artifactId>maven-assembly-plugin</artifactId>
+    <configuration>
+        <archive>
+            <manifest>
+                <mainClass>fully.qualified.MainClass</mainClass>
+            </manifest>
+        </archive>
+        <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+        </descriptorRefs>
+    </configuration>
+</plugin>
+```
+
+Mysql Example:
+```
+storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar org.apache.storm.jdbc.topology.UserPersistanceTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology
+```
+
+You can execute a select query against the user table which should show newly inserted rows:
+
+```
+select * from user;
+```
+
+For trident you can view `org.apache.storm.jdbc.topology.UserPersistanceTridentTopology`.