You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/04 04:06:08 UTC

[26/50] [abbrv] storm git commit: merge flux into external/flux/

merge flux into external/flux/


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b21a98dd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b21a98dd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b21a98dd

Branch: refs/heads/0.10.x-branch
Commit: b21a98dd87f82a06a6295ab2bfd832c2810ca57e
Parents: ea0fe12 b372a11
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed May 6 13:31:04 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed May 6 13:31:04 2015 -0400

----------------------------------------------------------------------
 external/flux/.gitignore                        |  15 +
 external/flux/LICENSE                           | 202 +++++
 external/flux/README.md                         | 845 +++++++++++++++++++
 external/flux/flux-core/pom.xml                 |  94 +++
 .../main/java/org/apache/storm/flux/Flux.java   | 263 ++++++
 .../java/org/apache/storm/flux/FluxBuilder.java | 591 +++++++++++++
 .../apache/storm/flux/api/TopologySource.java   |  39 +
 .../org/apache/storm/flux/model/BeanDef.java    |  39 +
 .../apache/storm/flux/model/BeanReference.java  |  39 +
 .../org/apache/storm/flux/model/BoltDef.java    |  24 +
 .../storm/flux/model/ConfigMethodDef.java       |  62 ++
 .../storm/flux/model/ExecutionContext.java      |  77 ++
 .../apache/storm/flux/model/GroupingDef.java    |  77 ++
 .../org/apache/storm/flux/model/IncludeDef.java |  54 ++
 .../org/apache/storm/flux/model/ObjectDef.java  |  90 ++
 .../apache/storm/flux/model/PropertyDef.java    |  58 ++
 .../org/apache/storm/flux/model/SpoutDef.java   |  24 +
 .../org/apache/storm/flux/model/StreamDef.java  |  64 ++
 .../apache/storm/flux/model/TopologyDef.java    | 216 +++++
 .../storm/flux/model/TopologySourceDef.java     |  36 +
 .../org/apache/storm/flux/model/VertexDef.java  |  36 +
 .../apache/storm/flux/parser/FluxParser.java    | 202 +++++
 .../flux-core/src/main/resources/splash.txt     |   9 +
 .../org/apache/storm/flux/FluxBuilderTest.java  |  31 +
 .../org/apache/storm/flux/IntegrationTest.java  |  41 +
 .../java/org/apache/storm/flux/TCKTest.java     | 234 +++++
 .../multilang/MultilangEnvirontmentTest.java    |  89 ++
 .../apache/storm/flux/test/SimpleTopology.java  |  42 +
 .../storm/flux/test/SimpleTopologySource.java   |  35 +
 .../test/SimpleTopologyWithConfigParam.java     |  38 +
 .../org/apache/storm/flux/test/TestBolt.java    |  63 ++
 .../storm/flux/test/TridentTopologySource.java  |  54 ++
 .../src/test/resources/configs/bad_hbase.yaml   |  98 +++
 .../resources/configs/config-methods-test.yaml  |  70 ++
 .../existing-topology-method-override.yaml      |  10 +
 .../existing-topology-reflection-config.yaml    |   9 +
 .../configs/existing-topology-reflection.yaml   |   9 +
 .../configs/existing-topology-trident.yaml      |   9 +
 .../resources/configs/existing-topology.yaml    |   8 +
 .../src/test/resources/configs/hdfs_test.yaml   |  97 +++
 .../test/resources/configs/include_test.yaml    |  25 +
 .../configs/invalid-existing-topology.yaml      |  17 +
 .../src/test/resources/configs/kafka_test.yaml  | 126 +++
 .../src/test/resources/configs/shell_test.yaml  | 104 +++
 .../test/resources/configs/simple_hbase.yaml    | 120 +++
 .../resources/configs/substitution-test.yaml    | 106 +++
 .../src/test/resources/configs/tck.yaml         |  95 +++
 .../src/test/resources/configs/test.properties  |   2 +
 .../flux-core/src/test/resources/logback.xml    |  30 +
 external/flux/flux-examples/README.md           |  68 ++
 external/flux/flux-examples/pom.xml             |  87 ++
 .../storm/flux/examples/WordCountClient.java    |  74 ++
 .../apache/storm/flux/examples/WordCounter.java |  71 ++
 .../src/main/resources/hbase_bolt.properties    |  18 +
 .../src/main/resources/hdfs_bolt.properties     |  26 +
 .../src/main/resources/kafka_spout.yaml         | 136 +++
 .../src/main/resources/multilang.yaml           |  89 ++
 .../src/main/resources/simple_hbase.yaml        |  92 ++
 .../src/main/resources/simple_hdfs.yaml         | 105 +++
 .../src/main/resources/simple_wordcount.yaml    |  68 ++
 external/flux/flux-ui/README.md                 |   3 +
 external/flux/flux-wrappers/pom.xml             |  35 +
 .../flux/wrappers/bolts/FluxShellBolt.java      |  56 ++
 .../storm/flux/wrappers/bolts/LogInfoBolt.java  |  44 +
 .../flux/wrappers/spouts/FluxShellSpout.java    |  55 ++
 .../main/resources/resources/randomsentence.js  |  93 ++
 .../main/resources/resources/splitsentence.py   |  24 +
 .../src/main/resources/resources/storm.js       | 373 ++++++++
 .../src/main/resources/resources/storm.py       | 260 ++++++
 external/flux/pom.xml                           | 126 +++
 70 files changed, 6621 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/.gitignore
----------------------------------------------------------------------
diff --cc external/flux/.gitignore
index 0000000,0000000..35fb1db
new file mode 100644
--- /dev/null
+++ b/external/flux/.gitignore
@@@ -1,0 -1,0 +1,15 @@@
++*.class
++**/target
++
++# Package Files #
++*.jar
++*.war
++*.ear
++
++# Intellij
++**/*.iml
++**/*.ipr
++**/*.iws
++
++# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
++hs_err_pid*

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/LICENSE
----------------------------------------------------------------------
diff --cc external/flux/LICENSE
index 0000000,0000000..e06d208
new file mode 100644
--- /dev/null
+++ b/external/flux/LICENSE
@@@ -1,0 -1,0 +1,202 @@@
++Apache License
++                           Version 2.0, January 2004
++                        http://www.apache.org/licenses/
++
++   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
++
++   1. Definitions.
++
++      "License" shall mean the terms and conditions for use, reproduction,
++      and distribution as defined by Sections 1 through 9 of this document.
++
++      "Licensor" shall mean the copyright owner or entity authorized by
++      the copyright owner that is granting the License.
++
++      "Legal Entity" shall mean the union of the acting entity and all
++      other entities that control, are controlled by, or are under common
++      control with that entity. For the purposes of this definition,
++      "control" means (i) the power, direct or indirect, to cause the
++      direction or management of such entity, whether by contract or
++      otherwise, or (ii) ownership of fifty percent (50%) or more of the
++      outstanding shares, or (iii) beneficial ownership of such entity.
++
++      "You" (or "Your") shall mean an individual or Legal Entity
++      exercising permissions granted by this License.
++
++      "Source" form shall mean the preferred form for making modifications,
++      including but not limited to software source code, documentation
++      source, and configuration files.
++
++      "Object" form shall mean any form resulting from mechanical
++      transformation or translation of a Source form, including but
++      not limited to compiled object code, generated documentation,
++      and conversions to other media types.
++
++      "Work" shall mean the work of authorship, whether in Source or
++      Object form, made available under the License, as indicated by a
++      copyright notice that is included in or attached to the work
++      (an example is provided in the Appendix below).
++
++      "Derivative Works" shall mean any work, whether in Source or Object
++      form, that is based on (or derived from) the Work and for which the
++      editorial revisions, annotations, elaborations, or other modifications
++      represent, as a whole, an original work of authorship. For the purposes
++      of this License, Derivative Works shall not include works that remain
++      separable from, or merely link (or bind by name) to the interfaces of,
++      the Work and Derivative Works thereof.
++
++      "Contribution" shall mean any work of authorship, including
++      the original version of the Work and any modifications or additions
++      to that Work or Derivative Works thereof, that is intentionally
++      submitted to Licensor for inclusion in the Work by the copyright owner
++      or by an individual or Legal Entity authorized to submit on behalf of
++      the copyright owner. For the purposes of this definition, "submitted"
++      means any form of electronic, verbal, or written communication sent
++      to the Licensor or its representatives, including but not limited to
++      communication on electronic mailing lists, source code control systems,
++      and issue tracking systems that are managed by, or on behalf of, the
++      Licensor for the purpose of discussing and improving the Work, but
++      excluding communication that is conspicuously marked or otherwise
++      designated in writing by the copyright owner as "Not a Contribution."
++
++      "Contributor" shall mean Licensor and any individual or Legal Entity
++      on behalf of whom a Contribution has been received by Licensor and
++      subsequently incorporated within the Work.
++
++   2. Grant of Copyright License. Subject to the terms and conditions of
++      this License, each Contributor hereby grants to You a perpetual,
++      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
++      copyright license to reproduce, prepare Derivative Works of,
++      publicly display, publicly perform, sublicense, and distribute the
++      Work and such Derivative Works in Source or Object form.
++
++   3. Grant of Patent License. Subject to the terms and conditions of
++      this License, each Contributor hereby grants to You a perpetual,
++      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
++      (except as stated in this section) patent license to make, have made,
++      use, offer to sell, sell, import, and otherwise transfer the Work,
++      where such license applies only to those patent claims licensable
++      by such Contributor that are necessarily infringed by their
++      Contribution(s) alone or by combination of their Contribution(s)
++      with the Work to which such Contribution(s) was submitted. If You
++      institute patent litigation against any entity (including a
++      cross-claim or counterclaim in a lawsuit) alleging that the Work
++      or a Contribution incorporated within the Work constitutes direct
++      or contributory patent infringement, then any patent licenses
++      granted to You under this License for that Work shall terminate
++      as of the date such litigation is filed.
++
++   4. Redistribution. You may reproduce and distribute copies of the
++      Work or Derivative Works thereof in any medium, with or without
++      modifications, and in Source or Object form, provided that You
++      meet the following conditions:
++
++      (a) You must give any other recipients of the Work or
++          Derivative Works a copy of this License; and
++
++      (b) You must cause any modified files to carry prominent notices
++          stating that You changed the files; and
++
++      (c) You must retain, in the Source form of any Derivative Works
++          that You distribute, all copyright, patent, trademark, and
++          attribution notices from the Source form of the Work,
++          excluding those notices that do not pertain to any part of
++          the Derivative Works; and
++
++      (d) If the Work includes a "NOTICE" text file as part of its
++          distribution, then any Derivative Works that You distribute must
++          include a readable copy of the attribution notices contained
++          within such NOTICE file, excluding those notices that do not
++          pertain to any part of the Derivative Works, in at least one
++          of the following places: within a NOTICE text file distributed
++          as part of the Derivative Works; within the Source form or
++          documentation, if provided along with the Derivative Works; or,
++          within a display generated by the Derivative Works, if and
++          wherever such third-party notices normally appear. The contents
++          of the NOTICE file are for informational purposes only and
++          do not modify the License. You may add Your own attribution
++          notices within Derivative Works that You distribute, alongside
++          or as an addendum to the NOTICE text from the Work, provided
++          that such additional attribution notices cannot be construed
++          as modifying the License.
++
++      You may add Your own copyright statement to Your modifications and
++      may provide additional or different license terms and conditions
++      for use, reproduction, or distribution of Your modifications, or
++      for any such Derivative Works as a whole, provided Your use,
++      reproduction, and distribution of the Work otherwise complies with
++      the conditions stated in this License.
++
++   5. Submission of Contributions. Unless You explicitly state otherwise,
++      any Contribution intentionally submitted for inclusion in the Work
++      by You to the Licensor shall be under the terms and conditions of
++      this License, without any additional terms or conditions.
++      Notwithstanding the above, nothing herein shall supersede or modify
++      the terms of any separate license agreement you may have executed
++      with Licensor regarding such Contributions.
++
++   6. Trademarks. This License does not grant permission to use the trade
++      names, trademarks, service marks, or product names of the Licensor,
++      except as required for reasonable and customary use in describing the
++      origin of the Work and reproducing the content of the NOTICE file.
++
++   7. Disclaimer of Warranty. Unless required by applicable law or
++      agreed to in writing, Licensor provides the Work (and each
++      Contributor provides its Contributions) on an "AS IS" BASIS,
++      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++      implied, including, without limitation, any warranties or conditions
++      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
++      PARTICULAR PURPOSE. You are solely responsible for determining the
++      appropriateness of using or redistributing the Work and assume any
++      risks associated with Your exercise of permissions under this License.
++
++   8. Limitation of Liability. In no event and under no legal theory,
++      whether in tort (including negligence), contract, or otherwise,
++      unless required by applicable law (such as deliberate and grossly
++      negligent acts) or agreed to in writing, shall any Contributor be
++      liable to You for damages, including any direct, indirect, special,
++      incidental, or consequential damages of any character arising as a
++      result of this License or out of the use or inability to use the
++      Work (including but not limited to damages for loss of goodwill,
++      work stoppage, computer failure or malfunction, or any and all
++      other commercial damages or losses), even if such Contributor
++      has been advised of the possibility of such damages.
++
++   9. Accepting Warranty or Additional Liability. While redistributing
++      the Work or Derivative Works thereof, You may choose to offer,
++      and charge a fee for, acceptance of support, warranty, indemnity,
++      or other liability obligations and/or rights consistent with this
++      License. However, in accepting such obligations, You may act only
++      on Your own behalf and on Your sole responsibility, not on behalf
++      of any other Contributor, and only if You agree to indemnify,
++      defend, and hold each Contributor harmless for any liability
++      incurred by, or claims asserted against, such Contributor by reason
++      of your accepting any such warranty or additional liability.
++
++   END OF TERMS AND CONDITIONS
++
++   APPENDIX: How to apply the Apache License to your work.
++
++      To apply the Apache License to your work, attach the following
++      boilerplate notice, with the fields enclosed by brackets "{}"
++      replaced with your own identifying information. (Don't include
++      the brackets!)  The text should be enclosed in the appropriate
++      comment syntax for the file format. We also recommend that a
++      file or class name and description of purpose be included on the
++      same "printed page" as the copyright notice for easier
++      identification within third-party archives.
++
++   Copyright {yyyy} {name of copyright owner}
++
++   Licensed under the Apache License, Version 2.0 (the "License");
++   you may not use this file except in compliance with the License.
++   You may obtain a copy of the License at
++
++       http://www.apache.org/licenses/LICENSE-2.0
++
++   Unless required by applicable law or agreed to in writing, software
++   distributed under the License is distributed on an "AS IS" BASIS,
++   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++   See the License for the specific language governing permissions and
++   limitations under the License.
++

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/README.md
----------------------------------------------------------------------
diff --cc external/flux/README.md
index 0000000,0000000..6f27219
new file mode 100644
--- /dev/null
+++ b/external/flux/README.md
@@@ -1,0 -1,0 +1,845 @@@
++# flux
++A framework for creating and deploying Apache Storm streaming computations with less friction.
++
++## Definition
++**flux** |fləks| _noun_
++
++1. The action or process of flowing or flowing out
++2. Continuous change
++3. In physics, the rate of flow of a fluid, radiant energy, or particles across a given area
++4. A substance mixed with a solid to lower its melting point
++
++## Rationale
++Bad things happen when configuration is hard-coded. No one should have to recompile or repackage an application in
++order to change configuration.
++
++## About
++Flux is a framework and set of utilities that make defining and deploying Apache Storm topologies less painful and
++deveoper-intensive.
++
++Have you ever found yourself repeating this pattern?:
++
++```java
++
++public static void main(String[] args) throws Exception {
++    // logic to determine if we're running locally or not...
++    // create necessary config options...
++    boolean runLocal = shouldRunLocal();
++    if(runLocal){
++        LocalCluster cluster = new LocalCluster();
++        cluster.submitTopology(name, conf, topology);
++    } else {
++        StormSubmitter.submitTopology(name, conf, topology);
++    }
++}
++```
++
++Wouldn't something like this be easier:
++
++```bash
++storm jar mytopology.jar org.apache.storm.flux.Flux --local config.yaml
++```
++
++or:
++
++```bash
++storm jar mytopology.jar org.apache.storm.flux.Flux --remote config.yaml
++```
++
++Another pain point often mentioned is the fact that the wiring for a Topology graph is often tied up in Java code,
++and that any changes require recompilation and repackaging of the topology jar file. Flux aims to alleviate that
++pain by allowing you to package all your Storm components in a single jar, and use an external text file to define
++the layout and configuration of your topologies.
++
++## Features
++
++ * Easily configure and deploy Storm topologies (Both Storm core and Microbatch API) without embedding configuration
++   in your topology code
++ * Support for existing topology code (see below)
++ * Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL
++ * YAML DSL support for most Storm components (storm-kafka, storm-hdfs, storm-hbase, etc.)
++ * Convenient support for multi-lang components
++ * External property substitution/filtering for easily switching between configurations/environments (similar to Maven-style
++   `${variable.name}` substitution)
++
++## Usage
++
++To use Flux, add it as a dependency and package all your Storm components in a fat jar, then create a YAML document
++to define your topology (see below for YAML configuration options).
++
++### Building from Source
++The easiest way to use Flux, is to add it as a Maven dependency in you project as described below.
++
++If you would like to build Flux from source and run the unit/integration tests, you will need the following installed
++on your system:
++
++* Python 2.6.x or later
++* Node.js 0.10.x or later
++
++#### Building with unit tests enabled:
++
++```
++mvn clean install
++```
++
++#### Building with unit tests disabled:
++If you would like to build Flux without installing Python or Node.js you can simply skip the unit tests:
++
++```
++mvn clean install -DskipTests=true
++```
++
++Note that if you plan on using Flux to deploy topologies to a remote cluster, you will still need to have Python
++installed since it is required by Apache Storm.
++
++
++#### Building with integration tests enabled:
++
++```
++mvn clean install -DskipIntegration=false
++```
++
++
++### Packaging with Maven
++To enable Flux for your Storm components, you need to add it as a dependency such that it's included in the Storm
++topology jar. This can be accomplished with the Maven shade plugin (preferred) or the Maven assembly plugin (not
++recommended).
++
++#### Flux Maven Dependency
++The current version of Flux is available in Maven Central at the following coordinates:
++```xml
++<dependency>
++    <groupId>com.github.ptgoetz</groupId>
++    <artifactId>flux-core</artifactId>
++    <version>0.3.0</version>
++</dependency>
++```
++
++#### Creating a Flux-Enabled Topology JAR
++The example below illustrates Flux usage with the Maven shade plugin:
++
++ ```xml
++<!-- include Flux and user dependencies in the shaded jar -->
++<dependencies>
++    <!-- Flux include -->
++    <dependency>
++        <groupId>com.github.ptgoetz</groupId>
++        <artifactId>flux-core</artifactId>
++        <version>0.3.0</version>
++    </dependency>
++
++    <!-- add user dependencies here... -->
++
++</dependencies>
++<!-- create a fat jar that includes all dependencies -->
++<build>
++    <plugins>
++        <plugin>
++            <groupId>org.apache.maven.plugins</groupId>
++            <artifactId>maven-shade-plugin</artifactId>
++            <version>1.4</version>
++            <configuration>
++                <createDependencyReducedPom>true</createDependencyReducedPom>
++            </configuration>
++            <executions>
++                <execution>
++                    <phase>package</phase>
++                    <goals>
++                        <goal>shade</goal>
++                    </goals>
++                    <configuration>
++                        <transformers>
++                            <transformer
++                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
++                            <transformer
++                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
++                                <mainClass>org.apache.storm.flux.Flux</mainClass>
++                            </transformer>
++                        </transformers>
++                    </configuration>
++                </execution>
++            </executions>
++        </plugin>
++    </plugins>
++</build>
++ ```
++
++### Deploying and Running a Flux Topology
++Once your topology components are packaged with the Flux dependency, you can run different topologies either locally
++or remotely using the `storm jar` command. For example, if your fat jar is named `myTopology-0.1.0-SNAPSHOT.jar` you
++could run it locally with the command:
++
++
++```bash
++storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml
++
++```
++
++### Command line options
++```
++usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux
++             [options] <topology-config.yaml>
++ -d,--dry-run                 Do not run or deploy the topology. Just
++                              build, validate, and print information about
++                              the topology.
++ -e,--env-filter              Perform environment variable substitution.
++                              Replace keysidentified with `${ENV-[NAME]}`
++                              will be replaced with the corresponding
++                              `NAME` environment value
++ -f,--filter <file>           Perform property substitution. Use the
++                              specified file as a source of properties,
++                              and replace keys identified with {$[property
++                              name]} with the value defined in the
++                              properties file.
++ -i,--inactive                Deploy the topology, but do not activate it.
++ -l,--local                   Run the topology in local mode.
++ -n,--no-splash               Suppress the printing of the splash screen.
++ -q,--no-detail               Suppress the printing of topology details.
++ -r,--remote                  Deploy the topology to a remote cluster.
++ -R,--resource                Treat the supplied path as a classpath
++                              resource instead of a file.
++ -s,--sleep <ms>              When running locally, the amount of time to
++                              sleep (in ms.) before killing the topology
++                              and shutting down the local cluster.
++ -z,--zookeeper <host:port>   When running in local mode, use the
++                              ZooKeeper at the specified <host>:<port>
++                              instead of the in-process ZooKeeper.
++                              (requires Storm 0.9.3 or later)
++```
++
++**NOTE:** Flux tries to avoid command line switch collision with the `storm` command, and allows any other command line
++switches to pass through to the `storm` command.
++
++For example, you can use the `storm` command switch `-c` to override a topology configuration property. The following
++example command will run Flux and override the `nimus.host` configuration:
++
++```bash
++storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote my_config.yaml -c nimbus.host=localhost
++```
++
++### Sample output
++```
++███████╗██╗     ██╗   ██╗██╗  ██╗
++██╔════╝██║     ██║   ██║╚██╗██╔╝
++█████╗  ██║     ██║   ██║ ╚███╔╝
++██╔══╝  ██║     ██║   ██║ ██╔██╗
++██║     ███████╗╚██████╔╝██╔╝ ██╗
++╚═╝     ╚══════╝ ╚═════╝ ╚═╝  ╚═╝
+++-         Apache Storm        -+
+++-  data FLow User eXperience  -+
++Version: 0.3.0
++Parsing file: /Users/hsimpson/Projects/donut_domination/storm/shell_test.yaml
++---------- TOPOLOGY DETAILS ----------
++Name: shell-topology
++--------------- SPOUTS ---------------
++sentence-spout[1](org.apache.storm.flux.spouts.GenericShellSpout)
++---------------- BOLTS ---------------
++splitsentence[1](org.apache.storm.flux.bolts.GenericShellBolt)
++log[1](org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
++count[1](backtype.storm.testing.TestWordCounter)
++--------------- STREAMS ---------------
++sentence-spout --SHUFFLE--> splitsentence
++splitsentence --FIELDS--> count
++count --SHUFFLE--> log
++--------------------------------------
++Submitting topology: 'shell-topology' to remote cluster...
++```
++
++## YAML Configuration
++Flux topologies are defined in a YAML file that describes a topology. A Flux topology
++definition consists of the following:
++
++  1. A topology name
++  2. A list of topology "components" (named Java objects that will be made available in the environment)
++  3. **EITHER** (A DSL topology definition):
++      * A list of spouts, each identified by a unique ID
++      * A list of bolts, each identified by a unique ID
++      * A list of "stream" objects representing a flow of tuples between spouts and bolts
++  4. **OR** (A JVM class that can produce a `backtype.storm.generated.StormTopology` instance:
++      * A `topologySource` definition.
++
++
++
++For example, here is a simple definition of a wordcount topology using the YAML DSL:
++
++```yaml
++name: "yaml-topology"
++config:
++  topology.workers: 1
++
++# spout definitions
++spouts:
++  - id: "spout-1"
++    className: "backtype.storm.testing.TestWordSpout"
++    parallelism: 1
++
++# bolt definitions
++bolts:
++  - id: "bolt-1"
++    className: "backtype.storm.testing.TestWordCounter"
++    parallelism: 1
++  - id: "bolt-2"
++    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++    parallelism: 1
++
++#stream definitions
++streams:
++  - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
++    from: "spout-1"
++    to: "bolt-1"
++    grouping:
++      type: FIELDS
++      args: ["word"]
++
++  - name: "bolt-1 --> bolt2"
++    from: "bolt-1"
++    to: "bolt-2"
++    grouping:
++      type: SHUFFLE
++
++
++```
++## Property Substitution/Filtering
++It's common for developers to want to easily switch between configurations, for example switching deployment between
++a development environment and a production environment. This can be accomplished by using separate YAML configuration
++files, but that approach would lead to unnecessary duplication, especially in situations where the Storm topology
++does not change, but configuration settings such as host names, ports, and parallelism paramters do.
++
++For this case, Flux offers properties filtering to allow you two externalize values to a `.properties` file and have
++them substituted before the `.yaml` file is parsed.
++
++To enable property filtering, use the `--filter` command line option and specify a `.properties` file. For example,
++if you invoked flux like so:
++
++```bash
++storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml --filter dev.properties
++```
++With the following `dev.properties` file:
++
++```properties
++kafka.zookeeper.hosts: localhost:2181
++```
++
++You would then be able to reference those properties by key in your `.yaml` file using `${}` syntax:
++
++```yaml
++  - id: "zkHosts"
++    className: "storm.kafka.ZkHosts"
++    constructorArgs:
++      - "${kafka.zookeeper.hosts}"
++```
++
++In this case, Flux would replace `${kafka.zookeeper.hosts}` with `localhost:2181` before parsing the YAML contents.
++
++### Environment Variable Substitution/Filtering
++Flux also allows environment variable substitution. For example, if an environment variable named `ZK_HOSTS` if defined,
++you can reference it in a Flux YAML file with the following syntax:
++
++```
++${ENV-ZK_HOSTS}
++```
++
++## Components
++Components are essentially named object instances that are made available as configuration options for spouts and
++bolts. If you are familiar with the Spring framework, components are roughly analagous to Spring beans.
++
++Every component is identified, at a minimum, by a unique identifier (String) and a class name (String). For example,
++the following will make an instance of the `storm.kafka.StringScheme` class available as a reference under the key
++`"stringScheme"` . This assumes the `storm.kafka.StringScheme` has a default constructor.
++
++```yaml
++components:
++  - id: "stringScheme"
++    className: "storm.kafka.StringScheme"
++```
++
++### Contructor Arguments, References, Properties and Configuration Methods
++
++####Constructor Arguments
++Arguments to a class constructor can be configured by adding a `contructorArgs` element to a components.
++`constructorArgs` is a list of objects that will be passed to the class' constructor. The following example creates an
++object by calling the constructor that takes a single string as an argument:
++
++```yaml
++  - id: "zkHosts"
++    className: "storm.kafka.ZkHosts"
++    constructorArgs:
++      - "localhost:2181"
++```
++
++####References
++Each component instance is identified by a unique id that allows it to be used/reused by other components. To
++reference an existing component, you specify the id of the component with the `ref` tag.
++
++In the following example, a component with the id `"stringScheme"` is created, and later referenced, as a an argument
++to another component's constructor:
++
++```yaml
++components:
++  - id: "stringScheme"
++    className: "storm.kafka.StringScheme"
++
++  - id: "stringMultiScheme"
++    className: "backtype.storm.spout.SchemeAsMultiScheme"
++    constructorArgs:
++      - ref: "stringScheme" # component with id "stringScheme" must be declared above.
++```
++**N.B.:** References can only be used after (below) the object they point to has been declared.
++
++####Properties
++In addition to calling constructors with different arguments, Flux also allows you to configure components using
++JavaBean-like setter methods and fields declared as `public`:
++
++```yaml
++  - id: "spoutConfig"
++    className: "storm.kafka.SpoutConfig"
++    constructorArgs:
++      # brokerHosts
++      - ref: "zkHosts"
++      # topic
++      - "myKafkaTopic"
++      # zkRoot
++      - "/kafkaSpout"
++      # id
++      - "myId"
++    properties:
++      - name: "forceFromStart"
++        value: true
++      - name: "scheme"
++        ref: "stringMultiScheme"
++```
++
++In the example above, the `properties` declaration will cause Flux to look for a public method in the `SpoutConfig` with
++the signature `setForceFromStart(boolean b)` and attempt to invoke it. If a setter method is not found, Flux will then
++look for a public instance variable with the name `forceFromStart` and attempt to set its value.
++
++References may also be used as property values.
++
++####Configuration Methods
++Conceptually, configuration methods are similar to Properties and Constructor Args -- they allow you to invoke an
++arbitrary method on an object after it is constructed. Configuration methods are useful for working with classes that
++don't expose JavaBean methods or have constructors that can fully configure the object. Common examples include classes
++that use the builder pattern for configuration/composition.
++
++The following YAML example creates a bolt and configures it by calling several methods:
++
++```yaml
++bolts:
++  - id: "bolt-1"
++    className: "org.apache.storm.flux.test.TestBolt"
++    parallelism: 1
++    configMethods:
++      - name: "withFoo"
++        args:
++          - "foo"
++      - name: "withBar"
++        args:
++          - "bar"
++      - name: "withFooBar"
++        args:
++          - "foo"
++          - "bar"
++```
++
++The signatures of the corresponding methods are as follows:
++
++```java
++    public void withFoo(String foo);
++    public void withBar(String bar);
++    public void withFooBar(String foo, String bar);
++```
++
++Arguments passed to configuration methods work much the same way as constructor arguments, and support references as
++well.
++
++### Using Java `enum`s in Contructor Arguments, References, Properties and Configuration Methods
++You can easily use Java `enum` values as arguments in a Flux YAML file, simply by referencing the name of the `enum`.
++
++For example, [Storm's HDFS module]() includes the following `enum` definition (simplified for brevity):
++
++```java
++public static enum Units {
++    KB, MB, GB, TB
++}
++```
++
++And the `org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy` class has the following constructor:
++
++```java
++public FileSizeRotationPolicy(float count, Units units)
++
++```
++The following Flux `component` definition could be used to call the constructor:
++
++```yaml
++  - id: "rotationPolicy"
++    className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"
++    constructorArgs:
++      - 5.0
++      - MB
++```
++
++The above definition is functionally equivalent to the following Java code:
++
++```java
++// rotate files when they reach 5MB
++FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
++```
++
++## Topology Config
++The `config` section is simply a map of Storm topology configuration parameters that will be passed to the
++`backtype.storm.StormSubmitter` as an instance of the `backtype.storm.Config` class:
++
++```yaml
++config:
++  topology.workers: 4
++  topology.max.spout.pending: 1000
++  topology.message.timeout.secs: 30
++```
++
++# Existing Topologies
++If you have existing Storm topologies, you can still use Flux to deploy/run/test them. This feature allows you to
++leverage Flux Constructor Arguments, References, Properties, and Topology Config declarations for existing topology
++classes.
++
++The easiest way to use an existing topology class is to define
++a `getTopology()` instance method with one of the following signatures:
++
++```java
++public StormTopology getTopology(Map<String, Object> config)
++```
++or:
++
++```java
++public StormTopology getTopology(Config config)
++```
++
++You could then use the following YAML to configure your topology:
++
++```yaml
++name: "existing-topology"
++topologySource:
++  className: "org.apache.storm.flux.test.SimpleTopology"
++```
++
++If the class you would like to use as a topology source has a different method name (i.e. not `getTopology`), you can
++override it:
++
++```yaml
++name: "existing-topology"
++topologySource:
++  className: "org.apache.storm.flux.test.SimpleTopology"
++  methodName: "getTopologyWithDifferentMethodName"
++```
++
++__N.B.:__ The specified method must accept a single argument of type `java.util.Map<String, Object>` or
++`backtype.storm.Config`, and return a `backtype.storm.generated.StormTopology` object.
++
++# YAML DSL
++## Spouts and Bolts
++Spout and Bolts are configured in their own respective section of the YAML configuration. Spout and Bolt definitions
++are extensions to the `component` definition that add a `parallelism` parameter that sets the parallelism  for a
++component when the topology is deployed.
++
++Because spout and bolt definitions extend `component` they support constructor arguments, references, and properties as
++well.
++
++Shell spout example:
++
++```yaml
++spouts:
++  - id: "sentence-spout"
++    className: "org.apache.storm.flux.spouts.GenericShellSpout"
++    # shell spout constructor takes 2 arguments: String[], String[]
++    constructorArgs:
++      # command line
++      - ["node", "randomsentence.js"]
++      # output fields
++      - ["word"]
++    parallelism: 1
++```
++
++Kafka spout example:
++
++```yaml
++components:
++  - id: "stringScheme"
++    className: "storm.kafka.StringScheme"
++
++  - id: "stringMultiScheme"
++    className: "backtype.storm.spout.SchemeAsMultiScheme"
++    constructorArgs:
++      - ref: "stringScheme"
++
++  - id: "zkHosts"
++    className: "storm.kafka.ZkHosts"
++    constructorArgs:
++      - "localhost:2181"
++
++# Alternative kafka config
++#  - id: "kafkaConfig"
++#    className: "storm.kafka.KafkaConfig"
++#    constructorArgs:
++#      # brokerHosts
++#      - ref: "zkHosts"
++#      # topic
++#      - "myKafkaTopic"
++#      # clientId (optional)
++#      - "myKafkaClientId"
++
++  - id: "spoutConfig"
++    className: "storm.kafka.SpoutConfig"
++    constructorArgs:
++      # brokerHosts
++      - ref: "zkHosts"
++      # topic
++      - "myKafkaTopic"
++      # zkRoot
++      - "/kafkaSpout"
++      # id
++      - "myId"
++    properties:
++      - name: "forceFromStart"
++        value: true
++      - name: "scheme"
++        ref: "stringMultiScheme"
++
++config:
++  topology.workers: 1
++
++# spout definitions
++spouts:
++  - id: "kafka-spout"
++    className: "storm.kafka.KafkaSpout"
++    constructorArgs:
++      - ref: "spoutConfig"
++
++```
++
++Bolt Examples:
++
++```yaml
++# bolt definitions
++bolts:
++  - id: "splitsentence"
++    className: "org.apache.storm.flux.bolts.GenericShellBolt"
++    constructorArgs:
++      # command line
++      - ["python", "splitsentence.py"]
++      # output fields
++      - ["word"]
++    parallelism: 1
++    # ...
++
++  - id: "log"
++    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++    parallelism: 1
++    # ...
++
++  - id: "count"
++    className: "backtype.storm.testing.TestWordCounter"
++    parallelism: 1
++    # ...
++```
++## Streams and Stream Groupings
++Streams in Flux are represented as a list of connections (Graph edges, data flow, etc.) between the Spouts and Bolts in
++a topology, with an associated Grouping definition.
++
++A Stream definition has the following properties:
++
++**`name`:** A name for the connection (optional, currently unused)
++
++**`from`:** The `id` of a Spout or Bolt that is the source (publisher)
++
++**`to`:** The `id` of a Spout or Bolt that is the destination (subscriber)
++
++**`grouping`:** The stream grouping definition for the Stream
++
++A Grouping definition has the following properties:
++
++**`type`:** The type of grouping. One of `ALL`,`CUSTOM`,`DIRECT`,`SHUFFLE`,`LOCAL_OR_SHUFFLE`,`FIELDS`,`GLOBAL`, or `NONE`.
++
++**`streamId`:** The Storm stream ID (Optional. If unspecified will use the default stream)
++
++**`args`:** For the `FIELDS` grouping, a list of field names.
++
++**`customClass`** For the `CUSTOM` grouping, a definition of custom grouping class instance
++
++The `streams` definition example below sets up a topology with the following wiring:
++
++```
++    kafka-spout --> splitsentence --> count --> log
++```
++
++
++```yaml
++#stream definitions
++# stream definitions define connections between spouts and bolts.
++# note that such connections can be cyclical
++# custom stream groupings are also supported
++
++streams:
++  - name: "kafka --> split" # name isn't used (placeholder for logging, UI, etc.)
++    from: "kafka-spout"
++    to: "splitsentence"
++    grouping:
++      type: SHUFFLE
++
++  - name: "split --> count"
++    from: "splitsentence"
++    to: "count"
++    grouping:
++      type: FIELDS
++      args: ["word"]
++
++  - name: "count --> log"
++    from: "count"
++    to: "log"
++    grouping:
++      type: SHUFFLE
++```
++
++### Custom Stream Groupings
++Custom stream groupings are defined by setting the grouping type to `CUSTOM` and defining a `customClass` parameter
++that tells Flux how to instantiate the custom class. The `customClass` definition extends `component`, so it supports
++constructor arguments, references, and properties as well.
++
++The example below creates a Stream with an instance of the `backtype.storm.testing.NGrouping` custom stream grouping
++class.
++
++```yaml
++  - name: "bolt-1 --> bolt2"
++    from: "bolt-1"
++    to: "bolt-2"
++    grouping:
++      type: CUSTOM
++      customClass:
++        className: "backtype.storm.testing.NGrouping"
++        constructorArgs:
++          - 1
++```
++
++## Includes and Overrides
++Flux allows you to include the contents of other YAML files, and have them treated as though they were defined in the
++same file. Includes may be either files, or classpath resources.
++
++Includes are specified as a list of maps:
++
++```yaml
++includes:
++  - resource: false
++    file: "src/test/resources/configs/shell_test.yaml"
++    override: false
++```
++
++If the `resource` property is set to `true`, the include will be loaded as a classpath resource from the value of the
++`file` attribute, otherwise it will be treated as a regular file.
++
++The `override` property controls how includes affect the values defined in the current file. If `override` is set to
++`true`, values in the included file will replace values in the current file being parsed. If `override` is set to
++`false`, values in the current file being parsed will take precedence, and the parser will refuse to replace them.
++
++**N.B.:** Includes are not yet recursive. Includes from included files will be ignored.
++
++
++## Basic Word Count Example
++
++This example uses a spout implemented in JavaScript, a bolt implemented in Python, and a bolt implemented in Java
++
++Topology YAML config:
++
++```yaml
++---
++name: "shell-topology"
++config:
++  topology.workers: 1
++
++# spout definitions
++spouts:
++  - id: "sentence-spout"
++    className: "org.apache.storm.flux.spouts.GenericShellSpout"
++    # shell spout constructor takes 2 arguments: String[], String[]
++    constructorArgs:
++      # command line
++      - ["node", "randomsentence.js"]
++      # output fields
++      - ["word"]
++    parallelism: 1
++
++# bolt definitions
++bolts:
++  - id: "splitsentence"
++    className: "org.apache.storm.flux.bolts.GenericShellBolt"
++    constructorArgs:
++      # command line
++      - ["python", "splitsentence.py"]
++      # output fields
++      - ["word"]
++    parallelism: 1
++
++  - id: "log"
++    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++    parallelism: 1
++
++  - id: "count"
++    className: "backtype.storm.testing.TestWordCounter"
++    parallelism: 1
++
++#stream definitions
++# stream definitions define connections between spouts and bolts.
++# note that such connections can be cyclical
++# custom stream groupings are also supported
++
++streams:
++  - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.)
++    from: "sentence-spout"
++    to: "splitsentence"
++    grouping:
++      type: SHUFFLE
++
++  - name: "split --> count"
++    from: "splitsentence"
++    to: "count"
++    grouping:
++      type: FIELDS
++      args: ["word"]
++
++  - name: "count --> log"
++    from: "count"
++    to: "log"
++    grouping:
++      type: SHUFFLE
++```
++
++
++## Micro-Batching (Trident) API Support
++Currenty, the Flux YAML DSL only supports the Core Storm API, but support for Storm's micro-batching API is planned.
++
++To use Flux with a Trident topology, define a topology getter method and reference it in your YAML config:
++
++```yaml
++name: "my-trident-topology"
++
++config:
++  topology.workers: 1
++
++topologySource:
++  className: "org.apache.storm.flux.test.TridentTopologySource"
++  # Flux will look for "getTopology", this will override that.
++  methodName: "getTopologyWithDifferentMethodName"
++```
++
++## Author
++P. Taylor Goetz
++
++## Contributors
++
++
++## Contributing
++
++Contributions in any form are more than welcome.
++
++The intent of this project is that it will be donated to Apache Storm.
++
++By offering any contributions to this project, you should be willing and able to submit an
++[Apache ICLA](http://www.apache.org/licenses/icla.txt), if you have not done so already.

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/pom.xml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/pom.xml
index 0000000,0000000..600613d
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/pom.xml
@@@ -1,0 -1,0 +1,94 @@@
++<?xml version="1.0" encoding="UTF-8"?>
++<!--
++ Licensed to the Apache Software Foundation (ASF) under one or more
++ contributor license agreements.  See the NOTICE file distributed with
++ this work for additional information regarding copyright ownership.
++ The ASF licenses this file to You under the Apache License, Version 2.0
++ (the "License"); you may not use this file except in compliance with
++ the License.  You may obtain a copy of the License at
++
++     http://www.apache.org/licenses/LICENSE-2.0
++
++ Unless required by applicable law or agreed to in writing, software
++ distributed under the License is distributed on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ See the License for the specific language governing permissions and
++ limitations under the License.
++-->
++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
++    <modelVersion>4.0.0</modelVersion>
++
++    <parent>
++        <groupId>com.github.ptgoetz</groupId>
++        <artifactId>flux</artifactId>
++        <version>0.3.1-SNAPSHOT</version>
++        <relativePath>../pom.xml</relativePath>
++    </parent>
++
++    <groupId>com.github.ptgoetz</groupId>
++    <artifactId>flux-core</artifactId>
++    <packaging>jar</packaging>
++
++    <name>flux-core</name>
++    <url>https://github.com/ptgoetz/flux</url>
++
++    <dependencies>
++        <dependency>
++            <groupId>com.github.ptgoetz</groupId>
++            <artifactId>flux-wrappers</artifactId>
++            <version>${project.version}</version>
++        </dependency>
++        <dependency>
++            <groupId>org.apache.storm</groupId>
++            <artifactId>storm-kafka</artifactId>
++            <version>${storm.version}</version>
++            <scope>test</scope>
++        </dependency>
++        <dependency>
++            <groupId>org.apache.storm</groupId>
++            <artifactId>storm-hdfs</artifactId>
++            <version>${storm.version}</version>
++            <scope>test</scope>
++        </dependency>
++        <dependency>
++            <groupId>org.apache.storm</groupId>
++            <artifactId>storm-hbase</artifactId>
++            <version>${storm.version}</version>
++            <scope>test</scope>
++        </dependency>
++    </dependencies>
++    <build>
++        <resources>
++            <resource>
++                <directory>src/main/resources</directory>
++                <filtering>true</filtering>
++            </resource>
++        </resources>
++        <plugins>
++        <plugin>
++            <groupId>org.apache.maven.plugins</groupId>
++            <artifactId>maven-shade-plugin</artifactId>
++            <version>1.4</version>
++            <configuration>
++                <createDependencyReducedPom>true</createDependencyReducedPom>
++            </configuration>
++            <executions>
++                <execution>
++                    <phase>package</phase>
++                    <goals>
++                        <goal>shade</goal>
++                    </goals>
++                    <configuration>
++                        <transformers>
++                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
++                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
++                                <mainClass>org.apache.storm.flux.Flux</mainClass>
++                            </transformer>
++                        </transformers>
++                    </configuration>
++                </execution>
++            </executions>
++        </plugin>
++        </plugins>
++    </build>
++</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
index 0000000,0000000..6300631
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@@ -1,0 -1,0 +1,263 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.StormSubmitter;
++import backtype.storm.generated.StormTopology;
++import backtype.storm.generated.SubmitOptions;
++import backtype.storm.generated.TopologyInitialStatus;
++import backtype.storm.utils.Utils;
++import org.apache.commons.cli.*;
++import org.apache.storm.flux.model.*;
++import org.apache.storm.flux.parser.FluxParser;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.io.*;
++import java.util.Map;
++import java.util.Properties;
++
++/**
++ * Flux entry point.
++ *
++ */
++public class Flux {
++    private static final Logger LOG = LoggerFactory.getLogger(Flux.class);
++
++    private static final Long DEFAULT_LOCAL_SLEEP_TIME = 60000l;
++
++    private static final Long DEFAULT_ZK_PORT = 2181l;
++
++    private static final String OPTION_LOCAL = "local";
++    private static final String OPTION_REMOTE = "remote";
++    private static final String OPTION_RESOURCE = "resource";
++    private static final String OPTION_SLEEP = "sleep";
++    private static final String OPTION_DRY_RUN = "dry-run";
++    private static final String OPTION_NO_DETAIL = "no-detail";
++    private static final String OPTION_NO_SPLASH = "no-splash";
++    private static final String OPTION_INACTIVE = "inactive";
++    private static final String OPTION_ZOOKEEPER = "zookeeper";
++    private static final String OPTION_FILTER = "filter";
++    private static final String OPTION_ENV_FILTER = "env-filter";
++
++    public static void main(String[] args) throws Exception {
++        Options options = new Options();
++
++        options.addOption(option(0, "l", OPTION_LOCAL, "Run the topology in local mode."));
++
++        options.addOption(option(0, "r", OPTION_REMOTE, "Deploy the topology to a remote cluster."));
++
++        options.addOption(option(0, "R", OPTION_RESOURCE, "Treat the supplied path as a classpath resource instead of a file."));
++
++        options.addOption(option(1, "s", OPTION_SLEEP, "ms", "When running locally, the amount of time to sleep (in ms.) " +
++                "before killing the topology and shutting down the local cluster."));
++
++        options.addOption(option(0, "d", OPTION_DRY_RUN, "Do not run or deploy the topology. Just build, validate, " +
++                "and print information about the topology."));
++
++        options.addOption(option(0, "q", OPTION_NO_DETAIL, "Suppress the printing of topology details."));
++
++        options.addOption(option(0, "n", OPTION_NO_SPLASH, "Suppress the printing of the splash screen."));
++
++        options.addOption(option(0, "i", OPTION_INACTIVE, "Deploy the topology, but do not activate it."));
++
++        options.addOption(option(1, "z", OPTION_ZOOKEEPER, "host:port", "When running in local mode, use the ZooKeeper at the " +
++                "specified <host>:<port> instead of the in-process ZooKeeper. (requires Storm 0.9.3 or later)"));
++
++        options.addOption(option(1, "f", OPTION_FILTER, "file", "Perform property substitution. Use the specified file " +
++                "as a source of properties, and replace keys identified with {$[property name]} with the value defined " +
++                "in the properties file."));
++
++        options.addOption(option(0, "e", OPTION_ENV_FILTER, "Perform environment variable substitution. Replace keys" +
++                "identified with `${ENV-[NAME]}` will be replaced with the corresponding `NAME` environment value"));
++
++        CommandLineParser parser = new BasicParser();
++        CommandLine cmd = parser.parse(options, args);
++
++        if (cmd.getArgs().length != 1) {
++            usage(options);
++            System.exit(1);
++        }
++        runCli(cmd);
++    }
++
++    private static Option option(int argCount, String shortName, String longName, String description){
++       return option(argCount, shortName, longName, longName, description);
++    }
++
++    private static Option option(int argCount, String shortName, String longName, String argName, String description){
++        Option option = OptionBuilder.hasArgs(argCount)
++                .withArgName(argName)
++                .withLongOpt(longName)
++                .withDescription(description)
++                .create(shortName);
++        return option;
++    }
++
++    private static void usage(Options options) {
++        HelpFormatter formatter = new HelpFormatter();
++        formatter.printHelp("storm jar <my_topology_uber_jar.jar> " +
++                Flux.class.getName() +
++                " [options] <topology-config.yaml>", options);
++    }
++
++    private static void runCli(CommandLine cmd)throws Exception {
++        if(!cmd.hasOption(OPTION_NO_SPLASH)) {
++            printSplash();
++        }
++
++        boolean dumpYaml = cmd.hasOption("dump-yaml");
++
++        TopologyDef topologyDef = null;
++        String filePath = (String)cmd.getArgList().get(0);
++
++        // TODO conditionally load properties from a file our resource
++        String filterProps = null;
++        if(cmd.hasOption(OPTION_FILTER)){
++            filterProps = cmd.getOptionValue(OPTION_FILTER);
++        }
++
++
++        boolean envFilter = cmd.hasOption(OPTION_ENV_FILTER);
++        if(cmd.hasOption(OPTION_RESOURCE)){
++            printf("Parsing classpath resource: %s", filePath);
++            topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, filterProps, envFilter);
++        } else {
++            printf("Parsing file: %s",
++                    new File(filePath).getAbsolutePath());
++            topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, filterProps, envFilter);
++        }
++
++
++        String topologyName = topologyDef.getName();
++        // merge contents of `config` into topology config
++        Config conf = FluxBuilder.buildConfig(topologyDef);
++        ExecutionContext context = new ExecutionContext(topologyDef, conf);
++        StormTopology topology = FluxBuilder.buildTopology(context);
++
++        if(!cmd.hasOption(OPTION_NO_DETAIL)){
++            printTopologyInfo(context);
++        }
++
++        if(!cmd.hasOption(OPTION_DRY_RUN)) {
++            if (cmd.hasOption(OPTION_REMOTE)) {
++                LOG.info("Running remotely...");
++                try {
++                    // should the topology be active or inactive
++                    SubmitOptions submitOptions = null;
++                    if(cmd.hasOption(OPTION_INACTIVE)){
++                        LOG.info("Deploying topology in an INACTIVE state...");
++                        submitOptions = new SubmitOptions(TopologyInitialStatus.INACTIVE);
++                    } else {
++                        LOG.info("Deploying topology in an ACTIVE state...");
++                        submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
++                    }
++                    StormSubmitter.submitTopology(topologyName, conf, topology, submitOptions, null);
++                } catch (Exception e) {
++                    LOG.warn("Unable to deploy topology to remote cluster.", e);
++                }
++            } else {
++                LOG.info("Running in local mode...");
++
++                String sleepStr = cmd.getOptionValue(OPTION_SLEEP);
++                Long sleepTime = DEFAULT_LOCAL_SLEEP_TIME;
++                if (sleepStr != null) {
++                    sleepTime = Long.parseLong(sleepStr);
++                }
++                LOG.debug("Sleep time: {}", sleepTime);
++                LocalCluster cluster = null;
++
++                // in-process or external zookeeper
++                if(cmd.hasOption(OPTION_ZOOKEEPER)){
++                    String zkStr = cmd.getOptionValue(OPTION_ZOOKEEPER);
++                    LOG.info("Using ZooKeeper at '{}' instead of in-process one.", zkStr);
++                    long zkPort = DEFAULT_ZK_PORT;
++                    String zkHost = null;
++                    if(zkStr.contains(":")){
++                        String[] hostPort = zkStr.split(":");
++                        zkHost = hostPort[0];
++                        zkPort = hostPort.length > 1 ? Long.parseLong(hostPort[1]) : DEFAULT_ZK_PORT;
++
++                    } else {
++                        zkHost = zkStr;
++                    }
++                    // the following constructor is only available in 0.9.3 and later
++                    try {
++                        cluster = new LocalCluster(zkHost, zkPort);
++                    } catch (NoSuchMethodError e){
++                        LOG.error("The --zookeeper option can only be used with Apache Storm 0.9.3 and later.");
++                        System.exit(1);
++                    }
++                } else {
++                    cluster = new LocalCluster();
++                }
++                cluster.submitTopology(topologyName, conf, topology);
++
++                Utils.sleep(sleepTime);
++                cluster.killTopology(topologyName);
++                cluster.shutdown();
++            }
++        }
++    }
++
++    static void printTopologyInfo(ExecutionContext ctx){
++        TopologyDef t = ctx.getTopologyDef();
++        if(t.isDslTopology()) {
++            print("---------- TOPOLOGY DETAILS ----------");
++
++            printf("Topology Name: %s", t.getName());
++            print("--------------- SPOUTS ---------------");
++            for (SpoutDef s : t.getSpouts()) {
++                printf("%s [%d] (%s)", s.getId(), s.getParallelism(), s.getClassName());
++            }
++            print("---------------- BOLTS ---------------");
++            for (BoltDef b : t.getBolts()) {
++                printf("%s [%d] (%s)", b.getId(), b.getParallelism(), b.getClassName());
++            }
++
++            print("--------------- STREAMS ---------------");
++            for (StreamDef sd : t.getStreams()) {
++                printf("%s --%s--> %s", sd.getFrom(), sd.getGrouping().getType(), sd.getTo());
++            }
++            print("--------------------------------------");
++        }
++    }
++
++    // save a little typing
++    private static void printf(String format, Object... args){
++        print(String.format(format, args));
++    }
++
++    private static void print(String string){
++        System.out.println(string);
++    }
++
++    private static void printSplash() throws IOException {
++        // banner
++        InputStream is = Flux.class.getResourceAsStream("/splash.txt");
++        if(is != null){
++            BufferedReader br = new BufferedReader(new InputStreamReader(is));
++            String line = null;
++            while((line = br.readLine()) != null){
++                System.out.println(line);
++            }
++        }
++    }
++}