You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/03 19:35:48 UTC
[34/50] [abbrv] storm git commit: merge flux into external/flux/
merge flux into external/flux/
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b21a98dd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b21a98dd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b21a98dd
Branch: refs/heads/master
Commit: b21a98dd87f82a06a6295ab2bfd832c2810ca57e
Parents: ea0fe12 b372a11
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed May 6 13:31:04 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed May 6 13:31:04 2015 -0400
----------------------------------------------------------------------
external/flux/.gitignore | 15 +
external/flux/LICENSE | 202 +++++
external/flux/README.md | 845 +++++++++++++++++++
external/flux/flux-core/pom.xml | 94 +++
.../main/java/org/apache/storm/flux/Flux.java | 263 ++++++
.../java/org/apache/storm/flux/FluxBuilder.java | 591 +++++++++++++
.../apache/storm/flux/api/TopologySource.java | 39 +
.../org/apache/storm/flux/model/BeanDef.java | 39 +
.../apache/storm/flux/model/BeanReference.java | 39 +
.../org/apache/storm/flux/model/BoltDef.java | 24 +
.../storm/flux/model/ConfigMethodDef.java | 62 ++
.../storm/flux/model/ExecutionContext.java | 77 ++
.../apache/storm/flux/model/GroupingDef.java | 77 ++
.../org/apache/storm/flux/model/IncludeDef.java | 54 ++
.../org/apache/storm/flux/model/ObjectDef.java | 90 ++
.../apache/storm/flux/model/PropertyDef.java | 58 ++
.../org/apache/storm/flux/model/SpoutDef.java | 24 +
.../org/apache/storm/flux/model/StreamDef.java | 64 ++
.../apache/storm/flux/model/TopologyDef.java | 216 +++++
.../storm/flux/model/TopologySourceDef.java | 36 +
.../org/apache/storm/flux/model/VertexDef.java | 36 +
.../apache/storm/flux/parser/FluxParser.java | 202 +++++
.../flux-core/src/main/resources/splash.txt | 9 +
.../org/apache/storm/flux/FluxBuilderTest.java | 31 +
.../org/apache/storm/flux/IntegrationTest.java | 41 +
.../java/org/apache/storm/flux/TCKTest.java | 234 +++++
.../multilang/MultilangEnvirontmentTest.java | 89 ++
.../apache/storm/flux/test/SimpleTopology.java | 42 +
.../storm/flux/test/SimpleTopologySource.java | 35 +
.../test/SimpleTopologyWithConfigParam.java | 38 +
.../org/apache/storm/flux/test/TestBolt.java | 63 ++
.../storm/flux/test/TridentTopologySource.java | 54 ++
.../src/test/resources/configs/bad_hbase.yaml | 98 +++
.../resources/configs/config-methods-test.yaml | 70 ++
.../existing-topology-method-override.yaml | 10 +
.../existing-topology-reflection-config.yaml | 9 +
.../configs/existing-topology-reflection.yaml | 9 +
.../configs/existing-topology-trident.yaml | 9 +
.../resources/configs/existing-topology.yaml | 8 +
.../src/test/resources/configs/hdfs_test.yaml | 97 +++
.../test/resources/configs/include_test.yaml | 25 +
.../configs/invalid-existing-topology.yaml | 17 +
.../src/test/resources/configs/kafka_test.yaml | 126 +++
.../src/test/resources/configs/shell_test.yaml | 104 +++
.../test/resources/configs/simple_hbase.yaml | 120 +++
.../resources/configs/substitution-test.yaml | 106 +++
.../src/test/resources/configs/tck.yaml | 95 +++
.../src/test/resources/configs/test.properties | 2 +
.../flux-core/src/test/resources/logback.xml | 30 +
external/flux/flux-examples/README.md | 68 ++
external/flux/flux-examples/pom.xml | 87 ++
.../storm/flux/examples/WordCountClient.java | 74 ++
.../apache/storm/flux/examples/WordCounter.java | 71 ++
.../src/main/resources/hbase_bolt.properties | 18 +
.../src/main/resources/hdfs_bolt.properties | 26 +
.../src/main/resources/kafka_spout.yaml | 136 +++
.../src/main/resources/multilang.yaml | 89 ++
.../src/main/resources/simple_hbase.yaml | 92 ++
.../src/main/resources/simple_hdfs.yaml | 105 +++
.../src/main/resources/simple_wordcount.yaml | 68 ++
external/flux/flux-ui/README.md | 3 +
external/flux/flux-wrappers/pom.xml | 35 +
.../flux/wrappers/bolts/FluxShellBolt.java | 56 ++
.../storm/flux/wrappers/bolts/LogInfoBolt.java | 44 +
.../flux/wrappers/spouts/FluxShellSpout.java | 55 ++
.../main/resources/resources/randomsentence.js | 93 ++
.../main/resources/resources/splitsentence.py | 24 +
.../src/main/resources/resources/storm.js | 373 ++++++++
.../src/main/resources/resources/storm.py | 260 ++++++
external/flux/pom.xml | 126 +++
70 files changed, 6621 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/.gitignore
----------------------------------------------------------------------
diff --cc external/flux/.gitignore
index 0000000,0000000..35fb1db
new file mode 100644
--- /dev/null
+++ b/external/flux/.gitignore
@@@ -1,0 -1,0 +1,15 @@@
++*.class
++**/target
++
++# Package Files #
++*.jar
++*.war
++*.ear
++
++# Intellij
++**/*.iml
++**/*.ipr
++**/*.iws
++
++# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
++hs_err_pid*
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/LICENSE
----------------------------------------------------------------------
diff --cc external/flux/LICENSE
index 0000000,0000000..e06d208
new file mode 100644
--- /dev/null
+++ b/external/flux/LICENSE
@@@ -1,0 -1,0 +1,202 @@@
++Apache License
++ Version 2.0, January 2004
++ http://www.apache.org/licenses/
++
++ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
++
++ 1. Definitions.
++
++ "License" shall mean the terms and conditions for use, reproduction,
++ and distribution as defined by Sections 1 through 9 of this document.
++
++ "Licensor" shall mean the copyright owner or entity authorized by
++ the copyright owner that is granting the License.
++
++ "Legal Entity" shall mean the union of the acting entity and all
++ other entities that control, are controlled by, or are under common
++ control with that entity. For the purposes of this definition,
++ "control" means (i) the power, direct or indirect, to cause the
++ direction or management of such entity, whether by contract or
++ otherwise, or (ii) ownership of fifty percent (50%) or more of the
++ outstanding shares, or (iii) beneficial ownership of such entity.
++
++ "You" (or "Your") shall mean an individual or Legal Entity
++ exercising permissions granted by this License.
++
++ "Source" form shall mean the preferred form for making modifications,
++ including but not limited to software source code, documentation
++ source, and configuration files.
++
++ "Object" form shall mean any form resulting from mechanical
++ transformation or translation of a Source form, including but
++ not limited to compiled object code, generated documentation,
++ and conversions to other media types.
++
++ "Work" shall mean the work of authorship, whether in Source or
++ Object form, made available under the License, as indicated by a
++ copyright notice that is included in or attached to the work
++ (an example is provided in the Appendix below).
++
++ "Derivative Works" shall mean any work, whether in Source or Object
++ form, that is based on (or derived from) the Work and for which the
++ editorial revisions, annotations, elaborations, or other modifications
++ represent, as a whole, an original work of authorship. For the purposes
++ of this License, Derivative Works shall not include works that remain
++ separable from, or merely link (or bind by name) to the interfaces of,
++ the Work and Derivative Works thereof.
++
++ "Contribution" shall mean any work of authorship, including
++ the original version of the Work and any modifications or additions
++ to that Work or Derivative Works thereof, that is intentionally
++ submitted to Licensor for inclusion in the Work by the copyright owner
++ or by an individual or Legal Entity authorized to submit on behalf of
++ the copyright owner. For the purposes of this definition, "submitted"
++ means any form of electronic, verbal, or written communication sent
++ to the Licensor or its representatives, including but not limited to
++ communication on electronic mailing lists, source code control systems,
++ and issue tracking systems that are managed by, or on behalf of, the
++ Licensor for the purpose of discussing and improving the Work, but
++ excluding communication that is conspicuously marked or otherwise
++ designated in writing by the copyright owner as "Not a Contribution."
++
++ "Contributor" shall mean Licensor and any individual or Legal Entity
++ on behalf of whom a Contribution has been received by Licensor and
++ subsequently incorporated within the Work.
++
++ 2. Grant of Copyright License. Subject to the terms and conditions of
++ this License, each Contributor hereby grants to You a perpetual,
++ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
++ copyright license to reproduce, prepare Derivative Works of,
++ publicly display, publicly perform, sublicense, and distribute the
++ Work and such Derivative Works in Source or Object form.
++
++ 3. Grant of Patent License. Subject to the terms and conditions of
++ this License, each Contributor hereby grants to You a perpetual,
++ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
++ (except as stated in this section) patent license to make, have made,
++ use, offer to sell, sell, import, and otherwise transfer the Work,
++ where such license applies only to those patent claims licensable
++ by such Contributor that are necessarily infringed by their
++ Contribution(s) alone or by combination of their Contribution(s)
++ with the Work to which such Contribution(s) was submitted. If You
++ institute patent litigation against any entity (including a
++ cross-claim or counterclaim in a lawsuit) alleging that the Work
++ or a Contribution incorporated within the Work constitutes direct
++ or contributory patent infringement, then any patent licenses
++ granted to You under this License for that Work shall terminate
++ as of the date such litigation is filed.
++
++ 4. Redistribution. You may reproduce and distribute copies of the
++ Work or Derivative Works thereof in any medium, with or without
++ modifications, and in Source or Object form, provided that You
++ meet the following conditions:
++
++ (a) You must give any other recipients of the Work or
++ Derivative Works a copy of this License; and
++
++ (b) You must cause any modified files to carry prominent notices
++ stating that You changed the files; and
++
++ (c) You must retain, in the Source form of any Derivative Works
++ that You distribute, all copyright, patent, trademark, and
++ attribution notices from the Source form of the Work,
++ excluding those notices that do not pertain to any part of
++ the Derivative Works; and
++
++ (d) If the Work includes a "NOTICE" text file as part of its
++ distribution, then any Derivative Works that You distribute must
++ include a readable copy of the attribution notices contained
++ within such NOTICE file, excluding those notices that do not
++ pertain to any part of the Derivative Works, in at least one
++ of the following places: within a NOTICE text file distributed
++ as part of the Derivative Works; within the Source form or
++ documentation, if provided along with the Derivative Works; or,
++ within a display generated by the Derivative Works, if and
++ wherever such third-party notices normally appear. The contents
++ of the NOTICE file are for informational purposes only and
++ do not modify the License. You may add Your own attribution
++ notices within Derivative Works that You distribute, alongside
++ or as an addendum to the NOTICE text from the Work, provided
++ that such additional attribution notices cannot be construed
++ as modifying the License.
++
++ You may add Your own copyright statement to Your modifications and
++ may provide additional or different license terms and conditions
++ for use, reproduction, or distribution of Your modifications, or
++ for any such Derivative Works as a whole, provided Your use,
++ reproduction, and distribution of the Work otherwise complies with
++ the conditions stated in this License.
++
++ 5. Submission of Contributions. Unless You explicitly state otherwise,
++ any Contribution intentionally submitted for inclusion in the Work
++ by You to the Licensor shall be under the terms and conditions of
++ this License, without any additional terms or conditions.
++ Notwithstanding the above, nothing herein shall supersede or modify
++ the terms of any separate license agreement you may have executed
++ with Licensor regarding such Contributions.
++
++ 6. Trademarks. This License does not grant permission to use the trade
++ names, trademarks, service marks, or product names of the Licensor,
++ except as required for reasonable and customary use in describing the
++ origin of the Work and reproducing the content of the NOTICE file.
++
++ 7. Disclaimer of Warranty. Unless required by applicable law or
++ agreed to in writing, Licensor provides the Work (and each
++ Contributor provides its Contributions) on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++ implied, including, without limitation, any warranties or conditions
++ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
++ PARTICULAR PURPOSE. You are solely responsible for determining the
++ appropriateness of using or redistributing the Work and assume any
++ risks associated with Your exercise of permissions under this License.
++
++ 8. Limitation of Liability. In no event and under no legal theory,
++ whether in tort (including negligence), contract, or otherwise,
++ unless required by applicable law (such as deliberate and grossly
++ negligent acts) or agreed to in writing, shall any Contributor be
++ liable to You for damages, including any direct, indirect, special,
++ incidental, or consequential damages of any character arising as a
++ result of this License or out of the use or inability to use the
++ Work (including but not limited to damages for loss of goodwill,
++ work stoppage, computer failure or malfunction, or any and all
++ other commercial damages or losses), even if such Contributor
++ has been advised of the possibility of such damages.
++
++ 9. Accepting Warranty or Additional Liability. While redistributing
++ the Work or Derivative Works thereof, You may choose to offer,
++ and charge a fee for, acceptance of support, warranty, indemnity,
++ or other liability obligations and/or rights consistent with this
++ License. However, in accepting such obligations, You may act only
++ on Your own behalf and on Your sole responsibility, not on behalf
++ of any other Contributor, and only if You agree to indemnify,
++ defend, and hold each Contributor harmless for any liability
++ incurred by, or claims asserted against, such Contributor by reason
++ of your accepting any such warranty or additional liability.
++
++ END OF TERMS AND CONDITIONS
++
++ APPENDIX: How to apply the Apache License to your work.
++
++ To apply the Apache License to your work, attach the following
++ boilerplate notice, with the fields enclosed by brackets "{}"
++ replaced with your own identifying information. (Don't include
++ the brackets!) The text should be enclosed in the appropriate
++ comment syntax for the file format. We also recommend that a
++ file or class name and description of purpose be included on the
++ same "printed page" as the copyright notice for easier
++ identification within third-party archives.
++
++ Copyright {yyyy} {name of copyright owner}
++
++ Licensed under the Apache License, Version 2.0 (the "License");
++ you may not use this file except in compliance with the License.
++ You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++ Unless required by applicable law or agreed to in writing, software
++ distributed under the License is distributed on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ See the License for the specific language governing permissions and
++ limitations under the License.
++
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/README.md
----------------------------------------------------------------------
diff --cc external/flux/README.md
index 0000000,0000000..6f27219
new file mode 100644
--- /dev/null
+++ b/external/flux/README.md
@@@ -1,0 -1,0 +1,845 @@@
++# flux
++A framework for creating and deploying Apache Storm streaming computations with less friction.
++
++## Definition
++**flux** |fləks| _noun_
++
++1. The action or process of flowing or flowing out
++2. Continuous change
++3. In physics, the rate of flow of a fluid, radiant energy, or particles across a given area
++4. A substance mixed with a solid to lower its melting point
++
++## Rationale
++Bad things happen when configuration is hard-coded. No one should have to recompile or repackage an application in
++order to change configuration.
++
++## About
++Flux is a framework and set of utilities that make defining and deploying Apache Storm topologies less painful and
++deveoper-intensive.
++
++Have you ever found yourself repeating this pattern?:
++
++```java
++
++public static void main(String[] args) throws Exception {
++ // logic to determine if we're running locally or not...
++ // create necessary config options...
++ boolean runLocal = shouldRunLocal();
++ if(runLocal){
++ LocalCluster cluster = new LocalCluster();
++ cluster.submitTopology(name, conf, topology);
++ } else {
++ StormSubmitter.submitTopology(name, conf, topology);
++ }
++}
++```
++
++Wouldn't something like this be easier:
++
++```bash
++storm jar mytopology.jar org.apache.storm.flux.Flux --local config.yaml
++```
++
++or:
++
++```bash
++storm jar mytopology.jar org.apache.storm.flux.Flux --remote config.yaml
++```
++
++Another pain point often mentioned is the fact that the wiring for a Topology graph is often tied up in Java code,
++and that any changes require recompilation and repackaging of the topology jar file. Flux aims to alleviate that
++pain by allowing you to package all your Storm components in a single jar, and use an external text file to define
++the layout and configuration of your topologies.
++
++## Features
++
++ * Easily configure and deploy Storm topologies (Both Storm core and Microbatch API) without embedding configuration
++ in your topology code
++ * Support for existing topology code (see below)
++ * Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL
++ * YAML DSL support for most Storm components (storm-kafka, storm-hdfs, storm-hbase, etc.)
++ * Convenient support for multi-lang components
++ * External property substitution/filtering for easily switching between configurations/environments (similar to Maven-style
++ `${variable.name}` substitution)
++
++## Usage
++
++To use Flux, add it as a dependency and package all your Storm components in a fat jar, then create a YAML document
++to define your topology (see below for YAML configuration options).
++
++### Building from Source
++The easiest way to use Flux, is to add it as a Maven dependency in you project as described below.
++
++If you would like to build Flux from source and run the unit/integration tests, you will need the following installed
++on your system:
++
++* Python 2.6.x or later
++* Node.js 0.10.x or later
++
++#### Building with unit tests enabled:
++
++```
++mvn clean install
++```
++
++#### Building with unit tests disabled:
++If you would like to build Flux without installing Python or Node.js you can simply skip the unit tests:
++
++```
++mvn clean install -DskipTests=true
++```
++
++Note that if you plan on using Flux to deploy topologies to a remote cluster, you will still need to have Python
++installed since it is required by Apache Storm.
++
++
++#### Building with integration tests enabled:
++
++```
++mvn clean install -DskipIntegration=false
++```
++
++
++### Packaging with Maven
++To enable Flux for your Storm components, you need to add it as a dependency such that it's included in the Storm
++topology jar. This can be accomplished with the Maven shade plugin (preferred) or the Maven assembly plugin (not
++recommended).
++
++#### Flux Maven Dependency
++The current version of Flux is available in Maven Central at the following coordinates:
++```xml
++<dependency>
++ <groupId>com.github.ptgoetz</groupId>
++ <artifactId>flux-core</artifactId>
++ <version>0.3.0</version>
++</dependency>
++```
++
++#### Creating a Flux-Enabled Topology JAR
++The example below illustrates Flux usage with the Maven shade plugin:
++
++ ```xml
++<!-- include Flux and user dependencies in the shaded jar -->
++<dependencies>
++ <!-- Flux include -->
++ <dependency>
++ <groupId>com.github.ptgoetz</groupId>
++ <artifactId>flux-core</artifactId>
++ <version>0.3.0</version>
++ </dependency>
++
++ <!-- add user dependencies here... -->
++
++</dependencies>
++<!-- create a fat jar that includes all dependencies -->
++<build>
++ <plugins>
++ <plugin>
++ <groupId>org.apache.maven.plugins</groupId>
++ <artifactId>maven-shade-plugin</artifactId>
++ <version>1.4</version>
++ <configuration>
++ <createDependencyReducedPom>true</createDependencyReducedPom>
++ </configuration>
++ <executions>
++ <execution>
++ <phase>package</phase>
++ <goals>
++ <goal>shade</goal>
++ </goals>
++ <configuration>
++ <transformers>
++ <transformer
++ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
++ <transformer
++ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
++ <mainClass>org.apache.storm.flux.Flux</mainClass>
++ </transformer>
++ </transformers>
++ </configuration>
++ </execution>
++ </executions>
++ </plugin>
++ </plugins>
++</build>
++ ```
++
++### Deploying and Running a Flux Topology
++Once your topology components are packaged with the Flux dependency, you can run different topologies either locally
++or remotely using the `storm jar` command. For example, if your fat jar is named `myTopology-0.1.0-SNAPSHOT.jar` you
++could run it locally with the command:
++
++
++```bash
++storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml
++
++```
++
++### Command line options
++```
++usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux
++ [options] <topology-config.yaml>
++ -d,--dry-run Do not run or deploy the topology. Just
++ build, validate, and print information about
++ the topology.
++ -e,--env-filter Perform environment variable substitution.
++ Replace keysidentified with `${ENV-[NAME]}`
++ will be replaced with the corresponding
++ `NAME` environment value
++ -f,--filter <file> Perform property substitution. Use the
++ specified file as a source of properties,
++ and replace keys identified with {$[property
++ name]} with the value defined in the
++ properties file.
++ -i,--inactive Deploy the topology, but do not activate it.
++ -l,--local Run the topology in local mode.
++ -n,--no-splash Suppress the printing of the splash screen.
++ -q,--no-detail Suppress the printing of topology details.
++ -r,--remote Deploy the topology to a remote cluster.
++ -R,--resource Treat the supplied path as a classpath
++ resource instead of a file.
++ -s,--sleep <ms> When running locally, the amount of time to
++ sleep (in ms.) before killing the topology
++ and shutting down the local cluster.
++ -z,--zookeeper <host:port> When running in local mode, use the
++ ZooKeeper at the specified <host>:<port>
++ instead of the in-process ZooKeeper.
++ (requires Storm 0.9.3 or later)
++```
++
++**NOTE:** Flux tries to avoid command line switch collision with the `storm` command, and allows any other command line
++switches to pass through to the `storm` command.
++
++For example, you can use the `storm` command switch `-c` to override a topology configuration property. The following
++example command will run Flux and override the `nimus.host` configuration:
++
++```bash
++storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote my_config.yaml -c nimbus.host=localhost
++```
++
++### Sample output
++```
++███████╗██╗ ██╗ ██╗██╗ ██╗
++██╔════╝██║ ██║ ██║╚██╗██╔╝
++█████╗ ██║ ██║ ██║ ╚███╔╝
++██╔══╝ ██║ ██║ ██║ ██╔██╗
++██║ ███████╗╚██████╔╝██╔╝ ██╗
++╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝
+++- Apache Storm -+
+++- data FLow User eXperience -+
++Version: 0.3.0
++Parsing file: /Users/hsimpson/Projects/donut_domination/storm/shell_test.yaml
++---------- TOPOLOGY DETAILS ----------
++Name: shell-topology
++--------------- SPOUTS ---------------
++sentence-spout[1](org.apache.storm.flux.spouts.GenericShellSpout)
++---------------- BOLTS ---------------
++splitsentence[1](org.apache.storm.flux.bolts.GenericShellBolt)
++log[1](org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
++count[1](backtype.storm.testing.TestWordCounter)
++--------------- STREAMS ---------------
++sentence-spout --SHUFFLE--> splitsentence
++splitsentence --FIELDS--> count
++count --SHUFFLE--> log
++--------------------------------------
++Submitting topology: 'shell-topology' to remote cluster...
++```
++
++## YAML Configuration
++Flux topologies are defined in a YAML file that describes a topology. A Flux topology
++definition consists of the following:
++
++ 1. A topology name
++ 2. A list of topology "components" (named Java objects that will be made available in the environment)
++ 3. **EITHER** (A DSL topology definition):
++ * A list of spouts, each identified by a unique ID
++ * A list of bolts, each identified by a unique ID
++ * A list of "stream" objects representing a flow of tuples between spouts and bolts
++ 4. **OR** (A JVM class that can produce a `backtype.storm.generated.StormTopology` instance:
++ * A `topologySource` definition.
++
++
++
++For example, here is a simple definition of a wordcount topology using the YAML DSL:
++
++```yaml
++name: "yaml-topology"
++config:
++ topology.workers: 1
++
++# spout definitions
++spouts:
++ - id: "spout-1"
++ className: "backtype.storm.testing.TestWordSpout"
++ parallelism: 1
++
++# bolt definitions
++bolts:
++ - id: "bolt-1"
++ className: "backtype.storm.testing.TestWordCounter"
++ parallelism: 1
++ - id: "bolt-2"
++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++ parallelism: 1
++
++#stream definitions
++streams:
++ - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
++ from: "spout-1"
++ to: "bolt-1"
++ grouping:
++ type: FIELDS
++ args: ["word"]
++
++ - name: "bolt-1 --> bolt2"
++ from: "bolt-1"
++ to: "bolt-2"
++ grouping:
++ type: SHUFFLE
++
++
++```
++## Property Substitution/Filtering
++It's common for developers to want to easily switch between configurations, for example switching deployment between
++a development environment and a production environment. This can be accomplished by using separate YAML configuration
++files, but that approach would lead to unnecessary duplication, especially in situations where the Storm topology
++does not change, but configuration settings such as host names, ports, and parallelism paramters do.
++
++For this case, Flux offers properties filtering to allow you two externalize values to a `.properties` file and have
++them substituted before the `.yaml` file is parsed.
++
++To enable property filtering, use the `--filter` command line option and specify a `.properties` file. For example,
++if you invoked flux like so:
++
++```bash
++storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml --filter dev.properties
++```
++With the following `dev.properties` file:
++
++```properties
++kafka.zookeeper.hosts: localhost:2181
++```
++
++You would then be able to reference those properties by key in your `.yaml` file using `${}` syntax:
++
++```yaml
++ - id: "zkHosts"
++ className: "storm.kafka.ZkHosts"
++ constructorArgs:
++ - "${kafka.zookeeper.hosts}"
++```
++
++In this case, Flux would replace `${kafka.zookeeper.hosts}` with `localhost:2181` before parsing the YAML contents.
++
++### Environment Variable Substitution/Filtering
++Flux also allows environment variable substitution. For example, if an environment variable named `ZK_HOSTS` if defined,
++you can reference it in a Flux YAML file with the following syntax:
++
++```
++${ENV-ZK_HOSTS}
++```
++
++## Components
++Components are essentially named object instances that are made available as configuration options for spouts and
++bolts. If you are familiar with the Spring framework, components are roughly analagous to Spring beans.
++
++Every component is identified, at a minimum, by a unique identifier (String) and a class name (String). For example,
++the following will make an instance of the `storm.kafka.StringScheme` class available as a reference under the key
++`"stringScheme"` . This assumes the `storm.kafka.StringScheme` has a default constructor.
++
++```yaml
++components:
++ - id: "stringScheme"
++ className: "storm.kafka.StringScheme"
++```
++
++### Contructor Arguments, References, Properties and Configuration Methods
++
++####Constructor Arguments
++Arguments to a class constructor can be configured by adding a `contructorArgs` element to a components.
++`constructorArgs` is a list of objects that will be passed to the class' constructor. The following example creates an
++object by calling the constructor that takes a single string as an argument:
++
++```yaml
++ - id: "zkHosts"
++ className: "storm.kafka.ZkHosts"
++ constructorArgs:
++ - "localhost:2181"
++```
++
++####References
++Each component instance is identified by a unique id that allows it to be used/reused by other components. To
++reference an existing component, you specify the id of the component with the `ref` tag.
++
++In the following example, a component with the id `"stringScheme"` is created, and later referenced, as a an argument
++to another component's constructor:
++
++```yaml
++components:
++ - id: "stringScheme"
++ className: "storm.kafka.StringScheme"
++
++ - id: "stringMultiScheme"
++ className: "backtype.storm.spout.SchemeAsMultiScheme"
++ constructorArgs:
++ - ref: "stringScheme" # component with id "stringScheme" must be declared above.
++```
++**N.B.:** References can only be used after (below) the object they point to has been declared.
++
++####Properties
++In addition to calling constructors with different arguments, Flux also allows you to configure components using
++JavaBean-like setter methods and fields declared as `public`:
++
++```yaml
++ - id: "spoutConfig"
++ className: "storm.kafka.SpoutConfig"
++ constructorArgs:
++ # brokerHosts
++ - ref: "zkHosts"
++ # topic
++ - "myKafkaTopic"
++ # zkRoot
++ - "/kafkaSpout"
++ # id
++ - "myId"
++ properties:
++ - name: "forceFromStart"
++ value: true
++ - name: "scheme"
++ ref: "stringMultiScheme"
++```
++
++In the example above, the `properties` declaration will cause Flux to look for a public method in the `SpoutConfig` with
++the signature `setForceFromStart(boolean b)` and attempt to invoke it. If a setter method is not found, Flux will then
++look for a public instance variable with the name `forceFromStart` and attempt to set its value.
++
++References may also be used as property values.
++
++####Configuration Methods
++Conceptually, configuration methods are similar to Properties and Constructor Args -- they allow you to invoke an
++arbitrary method on an object after it is constructed. Configuration methods are useful for working with classes that
++don't expose JavaBean methods or have constructors that can fully configure the object. Common examples include classes
++that use the builder pattern for configuration/composition.
++
++The following YAML example creates a bolt and configures it by calling several methods:
++
++```yaml
++bolts:
++ - id: "bolt-1"
++ className: "org.apache.storm.flux.test.TestBolt"
++ parallelism: 1
++ configMethods:
++ - name: "withFoo"
++ args:
++ - "foo"
++ - name: "withBar"
++ args:
++ - "bar"
++ - name: "withFooBar"
++ args:
++ - "foo"
++ - "bar"
++```
++
++The signatures of the corresponding methods are as follows:
++
++```java
++ public void withFoo(String foo);
++ public void withBar(String bar);
++ public void withFooBar(String foo, String bar);
++```
++
++Arguments passed to configuration methods work much the same way as constructor arguments, and support references as
++well.
++
++### Using Java `enum`s in Contructor Arguments, References, Properties and Configuration Methods
++You can easily use Java `enum` values as arguments in a Flux YAML file, simply by referencing the name of the `enum`.
++
++For example, [Storm's HDFS module]() includes the following `enum` definition (simplified for brevity):
++
++```java
++public static enum Units {
++ KB, MB, GB, TB
++}
++```
++
++And the `org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy` class has the following constructor:
++
++```java
++public FileSizeRotationPolicy(float count, Units units)
++
++```
++The following Flux `component` definition could be used to call the constructor:
++
++```yaml
++ - id: "rotationPolicy"
++ className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"
++ constructorArgs:
++ - 5.0
++ - MB
++```
++
++The above definition is functionally equivalent to the following Java code:
++
++```java
++// rotate files when they reach 5MB
++FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
++```
++
++## Topology Config
++The `config` section is simply a map of Storm topology configuration parameters that will be passed to the
++`backtype.storm.StormSubmitter` as an instance of the `backtype.storm.Config` class:
++
++```yaml
++config:
++ topology.workers: 4
++ topology.max.spout.pending: 1000
++ topology.message.timeout.secs: 30
++```
++
++# Existing Topologies
++If you have existing Storm topologies, you can still use Flux to deploy/run/test them. This feature allows you to
++leverage Flux Constructor Arguments, References, Properties, and Topology Config declarations for existing topology
++classes.
++
++The easiest way to use an existing topology class is to define
++a `getTopology()` instance method with one of the following signatures:
++
++```java
++public StormTopology getTopology(Map<String, Object> config)
++```
++or:
++
++```java
++public StormTopology getTopology(Config config)
++```
++
++You could then use the following YAML to configure your topology:
++
++```yaml
++name: "existing-topology"
++topologySource:
++ className: "org.apache.storm.flux.test.SimpleTopology"
++```
++
++If the class you would like to use as a topology source has a different method name (i.e. not `getTopology`), you can
++override it:
++
++```yaml
++name: "existing-topology"
++topologySource:
++ className: "org.apache.storm.flux.test.SimpleTopology"
++ methodName: "getTopologyWithDifferentMethodName"
++```
++
++__N.B.:__ The specified method must accept a single argument of type `java.util.Map<String, Object>` or
++`backtype.storm.Config`, and return a `backtype.storm.generated.StormTopology` object.
++
++# YAML DSL
++## Spouts and Bolts
++Spout and Bolts are configured in their own respective section of the YAML configuration. Spout and Bolt definitions
++are extensions to the `component` definition that add a `parallelism` parameter that sets the parallelism for a
++component when the topology is deployed.
++
++Because spout and bolt definitions extend `component` they support constructor arguments, references, and properties as
++well.
++
++Shell spout example:
++
++```yaml
++spouts:
++ - id: "sentence-spout"
++ className: "org.apache.storm.flux.spouts.GenericShellSpout"
++ # shell spout constructor takes 2 arguments: String[], String[]
++ constructorArgs:
++ # command line
++ - ["node", "randomsentence.js"]
++ # output fields
++ - ["word"]
++ parallelism: 1
++```
++
++Kafka spout example:
++
++```yaml
++components:
++ - id: "stringScheme"
++ className: "storm.kafka.StringScheme"
++
++ - id: "stringMultiScheme"
++ className: "backtype.storm.spout.SchemeAsMultiScheme"
++ constructorArgs:
++ - ref: "stringScheme"
++
++ - id: "zkHosts"
++ className: "storm.kafka.ZkHosts"
++ constructorArgs:
++ - "localhost:2181"
++
++# Alternative kafka config
++# - id: "kafkaConfig"
++# className: "storm.kafka.KafkaConfig"
++# constructorArgs:
++# # brokerHosts
++# - ref: "zkHosts"
++# # topic
++# - "myKafkaTopic"
++# # clientId (optional)
++# - "myKafkaClientId"
++
++ - id: "spoutConfig"
++ className: "storm.kafka.SpoutConfig"
++ constructorArgs:
++ # brokerHosts
++ - ref: "zkHosts"
++ # topic
++ - "myKafkaTopic"
++ # zkRoot
++ - "/kafkaSpout"
++ # id
++ - "myId"
++ properties:
++ - name: "forceFromStart"
++ value: true
++ - name: "scheme"
++ ref: "stringMultiScheme"
++
++config:
++ topology.workers: 1
++
++# spout definitions
++spouts:
++ - id: "kafka-spout"
++ className: "storm.kafka.KafkaSpout"
++ constructorArgs:
++ - ref: "spoutConfig"
++
++```
++
++Bolt Examples:
++
++```yaml
++# bolt definitions
++bolts:
++ - id: "splitsentence"
++ className: "org.apache.storm.flux.bolts.GenericShellBolt"
++ constructorArgs:
++ # command line
++ - ["python", "splitsentence.py"]
++ # output fields
++ - ["word"]
++ parallelism: 1
++ # ...
++
++ - id: "log"
++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++ parallelism: 1
++ # ...
++
++ - id: "count"
++ className: "backtype.storm.testing.TestWordCounter"
++ parallelism: 1
++ # ...
++```
++## Streams and Stream Groupings
++Streams in Flux are represented as a list of connections (Graph edges, data flow, etc.) between the Spouts and Bolts in
++a topology, with an associated Grouping definition.
++
++A Stream definition has the following properties:
++
++**`name`:** A name for the connection (optional, currently unused)
++
++**`from`:** The `id` of a Spout or Bolt that is the source (publisher)
++
++**`to`:** The `id` of a Spout or Bolt that is the destination (subscriber)
++
++**`grouping`:** The stream grouping definition for the Stream
++
++A Grouping definition has the following properties:
++
++**`type`:** The type of grouping. One of `ALL`,`CUSTOM`,`DIRECT`,`SHUFFLE`,`LOCAL_OR_SHUFFLE`,`FIELDS`,`GLOBAL`, or `NONE`.
++
++**`streamId`:** The Storm stream ID (Optional. If unspecified will use the default stream)
++
++**`args`:** For the `FIELDS` grouping, a list of field names.
++
++**`customClass`** For the `CUSTOM` grouping, a definition of custom grouping class instance
++
++The `streams` definition example below sets up a topology with the following wiring:
++
++```
++ kafka-spout --> splitsentence --> count --> log
++```
++
++
++```yaml
++#stream definitions
++# stream definitions define connections between spouts and bolts.
++# note that such connections can be cyclical
++# custom stream groupings are also supported
++
++streams:
++ - name: "kafka --> split" # name isn't used (placeholder for logging, UI, etc.)
++ from: "kafka-spout"
++ to: "splitsentence"
++ grouping:
++ type: SHUFFLE
++
++ - name: "split --> count"
++ from: "splitsentence"
++ to: "count"
++ grouping:
++ type: FIELDS
++ args: ["word"]
++
++ - name: "count --> log"
++ from: "count"
++ to: "log"
++ grouping:
++ type: SHUFFLE
++```
++
++### Custom Stream Groupings
++Custom stream groupings are defined by setting the grouping type to `CUSTOM` and defining a `customClass` parameter
++that tells Flux how to instantiate the custom class. The `customClass` definition extends `component`, so it supports
++constructor arguments, references, and properties as well.
++
++The example below creates a Stream with an instance of the `backtype.storm.testing.NGrouping` custom stream grouping
++class.
++
++```yaml
++ - name: "bolt-1 --> bolt2"
++ from: "bolt-1"
++ to: "bolt-2"
++ grouping:
++ type: CUSTOM
++ customClass:
++ className: "backtype.storm.testing.NGrouping"
++ constructorArgs:
++ - 1
++```
++
++## Includes and Overrides
++Flux allows you to include the contents of other YAML files, and have them treated as though they were defined in the
++same file. Includes may be either files, or classpath resources.
++
++Includes are specified as a list of maps:
++
++```yaml
++includes:
++ - resource: false
++ file: "src/test/resources/configs/shell_test.yaml"
++ override: false
++```
++
++If the `resource` property is set to `true`, the include will be loaded as a classpath resource from the value of the
++`file` attribute, otherwise it will be treated as a regular file.
++
++The `override` property controls how includes affect the values defined in the current file. If `override` is set to
++`true`, values in the included file will replace values in the current file being parsed. If `override` is set to
++`false`, values in the current file being parsed will take precedence, and the parser will refuse to replace them.
++
++**N.B.:** Includes are not yet recursive. Includes from included files will be ignored.
++
++
++## Basic Word Count Example
++
++This example uses a spout implemented in JavaScript, a bolt implemented in Python, and a bolt implemented in Java
++
++Topology YAML config:
++
++```yaml
++---
++name: "shell-topology"
++config:
++ topology.workers: 1
++
++# spout definitions
++spouts:
++ - id: "sentence-spout"
++ className: "org.apache.storm.flux.spouts.GenericShellSpout"
++ # shell spout constructor takes 2 arguments: String[], String[]
++ constructorArgs:
++ # command line
++ - ["node", "randomsentence.js"]
++ # output fields
++ - ["word"]
++ parallelism: 1
++
++# bolt definitions
++bolts:
++ - id: "splitsentence"
++ className: "org.apache.storm.flux.bolts.GenericShellBolt"
++ constructorArgs:
++ # command line
++ - ["python", "splitsentence.py"]
++ # output fields
++ - ["word"]
++ parallelism: 1
++
++ - id: "log"
++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++ parallelism: 1
++
++ - id: "count"
++ className: "backtype.storm.testing.TestWordCounter"
++ parallelism: 1
++
++#stream definitions
++# stream definitions define connections between spouts and bolts.
++# note that such connections can be cyclical
++# custom stream groupings are also supported
++
++streams:
++ - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.)
++ from: "sentence-spout"
++ to: "splitsentence"
++ grouping:
++ type: SHUFFLE
++
++ - name: "split --> count"
++ from: "splitsentence"
++ to: "count"
++ grouping:
++ type: FIELDS
++ args: ["word"]
++
++ - name: "count --> log"
++ from: "count"
++ to: "log"
++ grouping:
++ type: SHUFFLE
++```
++
++
++## Micro-Batching (Trident) API Support
++Currenty, the Flux YAML DSL only supports the Core Storm API, but support for Storm's micro-batching API is planned.
++
++To use Flux with a Trident topology, define a topology getter method and reference it in your YAML config:
++
++```yaml
++name: "my-trident-topology"
++
++config:
++ topology.workers: 1
++
++topologySource:
++ className: "org.apache.storm.flux.test.TridentTopologySource"
++ # Flux will look for "getTopology", this will override that.
++ methodName: "getTopologyWithDifferentMethodName"
++```
++
++## Author
++P. Taylor Goetz
++
++## Contributors
++
++
++## Contributing
++
++Contributions in any form are more than welcome.
++
++The intent of this project is that it will be donated to Apache Storm.
++
++By offering any contributions to this project, you should be willing and able to submit an
++[Apache ICLA](http://www.apache.org/licenses/icla.txt), if you have not done so already.
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/pom.xml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/pom.xml
index 0000000,0000000..600613d
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/pom.xml
@@@ -1,0 -1,0 +1,94 @@@
++<?xml version="1.0" encoding="UTF-8"?>
++<!--
++ Licensed to the Apache Software Foundation (ASF) under one or more
++ contributor license agreements. See the NOTICE file distributed with
++ this work for additional information regarding copyright ownership.
++ The ASF licenses this file to You under the Apache License, Version 2.0
++ (the "License"); you may not use this file except in compliance with
++ the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++ Unless required by applicable law or agreed to in writing, software
++ distributed under the License is distributed on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ See the License for the specific language governing permissions and
++ limitations under the License.
++-->
++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
++ <modelVersion>4.0.0</modelVersion>
++
++ <parent>
++ <groupId>com.github.ptgoetz</groupId>
++ <artifactId>flux</artifactId>
++ <version>0.3.1-SNAPSHOT</version>
++ <relativePath>../pom.xml</relativePath>
++ </parent>
++
++ <groupId>com.github.ptgoetz</groupId>
++ <artifactId>flux-core</artifactId>
++ <packaging>jar</packaging>
++
++ <name>flux-core</name>
++ <url>https://github.com/ptgoetz/flux</url>
++
++ <dependencies>
++ <dependency>
++ <groupId>com.github.ptgoetz</groupId>
++ <artifactId>flux-wrappers</artifactId>
++ <version>${project.version}</version>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.storm</groupId>
++ <artifactId>storm-kafka</artifactId>
++ <version>${storm.version}</version>
++ <scope>test</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.storm</groupId>
++ <artifactId>storm-hdfs</artifactId>
++ <version>${storm.version}</version>
++ <scope>test</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.storm</groupId>
++ <artifactId>storm-hbase</artifactId>
++ <version>${storm.version}</version>
++ <scope>test</scope>
++ </dependency>
++ </dependencies>
++ <build>
++ <resources>
++ <resource>
++ <directory>src/main/resources</directory>
++ <filtering>true</filtering>
++ </resource>
++ </resources>
++ <plugins>
++ <plugin>
++ <groupId>org.apache.maven.plugins</groupId>
++ <artifactId>maven-shade-plugin</artifactId>
++ <version>1.4</version>
++ <configuration>
++ <createDependencyReducedPom>true</createDependencyReducedPom>
++ </configuration>
++ <executions>
++ <execution>
++ <phase>package</phase>
++ <goals>
++ <goal>shade</goal>
++ </goals>
++ <configuration>
++ <transformers>
++ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
++ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
++ <mainClass>org.apache.storm.flux.Flux</mainClass>
++ </transformer>
++ </transformers>
++ </configuration>
++ </execution>
++ </executions>
++ </plugin>
++ </plugins>
++ </build>
++</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
index 0000000,0000000..6300631
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@@ -1,0 -1,0 +1,263 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.StormSubmitter;
++import backtype.storm.generated.StormTopology;
++import backtype.storm.generated.SubmitOptions;
++import backtype.storm.generated.TopologyInitialStatus;
++import backtype.storm.utils.Utils;
++import org.apache.commons.cli.*;
++import org.apache.storm.flux.model.*;
++import org.apache.storm.flux.parser.FluxParser;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.io.*;
++import java.util.Map;
++import java.util.Properties;
++
++/**
++ * Flux entry point.
++ *
++ */
++public class Flux {
++ private static final Logger LOG = LoggerFactory.getLogger(Flux.class);
++
++ private static final Long DEFAULT_LOCAL_SLEEP_TIME = 60000l;
++
++ private static final Long DEFAULT_ZK_PORT = 2181l;
++
++ private static final String OPTION_LOCAL = "local";
++ private static final String OPTION_REMOTE = "remote";
++ private static final String OPTION_RESOURCE = "resource";
++ private static final String OPTION_SLEEP = "sleep";
++ private static final String OPTION_DRY_RUN = "dry-run";
++ private static final String OPTION_NO_DETAIL = "no-detail";
++ private static final String OPTION_NO_SPLASH = "no-splash";
++ private static final String OPTION_INACTIVE = "inactive";
++ private static final String OPTION_ZOOKEEPER = "zookeeper";
++ private static final String OPTION_FILTER = "filter";
++ private static final String OPTION_ENV_FILTER = "env-filter";
++
++ public static void main(String[] args) throws Exception {
++ Options options = new Options();
++
++ options.addOption(option(0, "l", OPTION_LOCAL, "Run the topology in local mode."));
++
++ options.addOption(option(0, "r", OPTION_REMOTE, "Deploy the topology to a remote cluster."));
++
++ options.addOption(option(0, "R", OPTION_RESOURCE, "Treat the supplied path as a classpath resource instead of a file."));
++
++ options.addOption(option(1, "s", OPTION_SLEEP, "ms", "When running locally, the amount of time to sleep (in ms.) " +
++ "before killing the topology and shutting down the local cluster."));
++
++ options.addOption(option(0, "d", OPTION_DRY_RUN, "Do not run or deploy the topology. Just build, validate, " +
++ "and print information about the topology."));
++
++ options.addOption(option(0, "q", OPTION_NO_DETAIL, "Suppress the printing of topology details."));
++
++ options.addOption(option(0, "n", OPTION_NO_SPLASH, "Suppress the printing of the splash screen."));
++
++ options.addOption(option(0, "i", OPTION_INACTIVE, "Deploy the topology, but do not activate it."));
++
++ options.addOption(option(1, "z", OPTION_ZOOKEEPER, "host:port", "When running in local mode, use the ZooKeeper at the " +
++ "specified <host>:<port> instead of the in-process ZooKeeper. (requires Storm 0.9.3 or later)"));
++
++ options.addOption(option(1, "f", OPTION_FILTER, "file", "Perform property substitution. Use the specified file " +
++ "as a source of properties, and replace keys identified with {$[property name]} with the value defined " +
++ "in the properties file."));
++
++ options.addOption(option(0, "e", OPTION_ENV_FILTER, "Perform environment variable substitution. Replace keys" +
++ "identified with `${ENV-[NAME]}` will be replaced with the corresponding `NAME` environment value"));
++
++ CommandLineParser parser = new BasicParser();
++ CommandLine cmd = parser.parse(options, args);
++
++ if (cmd.getArgs().length != 1) {
++ usage(options);
++ System.exit(1);
++ }
++ runCli(cmd);
++ }
++
++ private static Option option(int argCount, String shortName, String longName, String description){
++ return option(argCount, shortName, longName, longName, description);
++ }
++
++ private static Option option(int argCount, String shortName, String longName, String argName, String description){
++ Option option = OptionBuilder.hasArgs(argCount)
++ .withArgName(argName)
++ .withLongOpt(longName)
++ .withDescription(description)
++ .create(shortName);
++ return option;
++ }
++
++ private static void usage(Options options) {
++ HelpFormatter formatter = new HelpFormatter();
++ formatter.printHelp("storm jar <my_topology_uber_jar.jar> " +
++ Flux.class.getName() +
++ " [options] <topology-config.yaml>", options);
++ }
++
++ private static void runCli(CommandLine cmd)throws Exception {
++ if(!cmd.hasOption(OPTION_NO_SPLASH)) {
++ printSplash();
++ }
++
++ boolean dumpYaml = cmd.hasOption("dump-yaml");
++
++ TopologyDef topologyDef = null;
++ String filePath = (String)cmd.getArgList().get(0);
++
++ // TODO conditionally load properties from a file our resource
++ String filterProps = null;
++ if(cmd.hasOption(OPTION_FILTER)){
++ filterProps = cmd.getOptionValue(OPTION_FILTER);
++ }
++
++
++ boolean envFilter = cmd.hasOption(OPTION_ENV_FILTER);
++ if(cmd.hasOption(OPTION_RESOURCE)){
++ printf("Parsing classpath resource: %s", filePath);
++ topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, filterProps, envFilter);
++ } else {
++ printf("Parsing file: %s",
++ new File(filePath).getAbsolutePath());
++ topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, filterProps, envFilter);
++ }
++
++
++ String topologyName = topologyDef.getName();
++ // merge contents of `config` into topology config
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++
++ if(!cmd.hasOption(OPTION_NO_DETAIL)){
++ printTopologyInfo(context);
++ }
++
++ if(!cmd.hasOption(OPTION_DRY_RUN)) {
++ if (cmd.hasOption(OPTION_REMOTE)) {
++ LOG.info("Running remotely...");
++ try {
++ // should the topology be active or inactive
++ SubmitOptions submitOptions = null;
++ if(cmd.hasOption(OPTION_INACTIVE)){
++ LOG.info("Deploying topology in an INACTIVE state...");
++ submitOptions = new SubmitOptions(TopologyInitialStatus.INACTIVE);
++ } else {
++ LOG.info("Deploying topology in an ACTIVE state...");
++ submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
++ }
++ StormSubmitter.submitTopology(topologyName, conf, topology, submitOptions, null);
++ } catch (Exception e) {
++ LOG.warn("Unable to deploy topology to remote cluster.", e);
++ }
++ } else {
++ LOG.info("Running in local mode...");
++
++ String sleepStr = cmd.getOptionValue(OPTION_SLEEP);
++ Long sleepTime = DEFAULT_LOCAL_SLEEP_TIME;
++ if (sleepStr != null) {
++ sleepTime = Long.parseLong(sleepStr);
++ }
++ LOG.debug("Sleep time: {}", sleepTime);
++ LocalCluster cluster = null;
++
++ // in-process or external zookeeper
++ if(cmd.hasOption(OPTION_ZOOKEEPER)){
++ String zkStr = cmd.getOptionValue(OPTION_ZOOKEEPER);
++ LOG.info("Using ZooKeeper at '{}' instead of in-process one.", zkStr);
++ long zkPort = DEFAULT_ZK_PORT;
++ String zkHost = null;
++ if(zkStr.contains(":")){
++ String[] hostPort = zkStr.split(":");
++ zkHost = hostPort[0];
++ zkPort = hostPort.length > 1 ? Long.parseLong(hostPort[1]) : DEFAULT_ZK_PORT;
++
++ } else {
++ zkHost = zkStr;
++ }
++ // the following constructor is only available in 0.9.3 and later
++ try {
++ cluster = new LocalCluster(zkHost, zkPort);
++ } catch (NoSuchMethodError e){
++ LOG.error("The --zookeeper option can only be used with Apache Storm 0.9.3 and later.");
++ System.exit(1);
++ }
++ } else {
++ cluster = new LocalCluster();
++ }
++ cluster.submitTopology(topologyName, conf, topology);
++
++ Utils.sleep(sleepTime);
++ cluster.killTopology(topologyName);
++ cluster.shutdown();
++ }
++ }
++ }
++
++ static void printTopologyInfo(ExecutionContext ctx){
++ TopologyDef t = ctx.getTopologyDef();
++ if(t.isDslTopology()) {
++ print("---------- TOPOLOGY DETAILS ----------");
++
++ printf("Topology Name: %s", t.getName());
++ print("--------------- SPOUTS ---------------");
++ for (SpoutDef s : t.getSpouts()) {
++ printf("%s [%d] (%s)", s.getId(), s.getParallelism(), s.getClassName());
++ }
++ print("---------------- BOLTS ---------------");
++ for (BoltDef b : t.getBolts()) {
++ printf("%s [%d] (%s)", b.getId(), b.getParallelism(), b.getClassName());
++ }
++
++ print("--------------- STREAMS ---------------");
++ for (StreamDef sd : t.getStreams()) {
++ printf("%s --%s--> %s", sd.getFrom(), sd.getGrouping().getType(), sd.getTo());
++ }
++ print("--------------------------------------");
++ }
++ }
++
++ // save a little typing
++ private static void printf(String format, Object... args){
++ print(String.format(format, args));
++ }
++
++ private static void print(String string){
++ System.out.println(string);
++ }
++
++ private static void printSplash() throws IOException {
++ // banner
++ InputStream is = Flux.class.getResourceAsStream("/splash.txt");
++ if(is != null){
++ BufferedReader br = new BufferedReader(new InputStreamReader(is));
++ String line = null;
++ while((line = br.readLine()) != null){
++ System.out.println(line);
++ }
++ }
++ }
++}