You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:13 UTC
[10/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/README.md
----------------------------------------------------------------------
diff --git a/flux/README.md b/flux/README.md
new file mode 100644
index 0000000..206ae22
--- /dev/null
+++ b/flux/README.md
@@ -0,0 +1,872 @@
+# flux
+A framework for creating and deploying Apache Storm streaming computations with less friction.
+
+## Definition
+**flux** |fl\u0259ks| _noun_
+
+1. The action or process of flowing or flowing out
+2. Continuous change
+3. In physics, the rate of flow of a fluid, radiant energy, or particles across a given area
+4. A substance mixed with a solid to lower its melting point
+
+## Rationale
+Bad things happen when configuration is hard-coded. No one should have to recompile or repackage an application in
+order to change configuration.
+
+## About
+Flux is a framework and set of utilities that make defining and deploying Apache Storm topologies less painful and
+deveoper-intensive.
+
+Have you ever found yourself repeating this pattern?:
+
+```java
+
+public static void main(String[] args) throws Exception {
+ // logic to determine if we're running locally or not...
+ // create necessary config options...
+ boolean runLocal = shouldRunLocal();
+ if(runLocal){
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(name, conf, topology);
+ } else {
+ StormSubmitter.submitTopology(name, conf, topology);
+ }
+}
+```
+
+Wouldn't something like this be easier:
+
+```bash
+storm jar mytopology.jar org.apache.storm.flux.Flux --local config.yaml
+```
+
+or:
+
+```bash
+storm jar mytopology.jar org.apache.storm.flux.Flux --remote config.yaml
+```
+
+Another pain point often mentioned is the fact that the wiring for a Topology graph is often tied up in Java code,
+and that any changes require recompilation and repackaging of the topology jar file. Flux aims to alleviate that
+pain by allowing you to package all your Storm components in a single jar, and use an external text file to define
+the layout and configuration of your topologies.
+
+## Features
+
+ * Easily configure and deploy Storm topologies (Both Storm core and Microbatch API) without embedding configuration
+ in your topology code
+ * Support for existing topology code (see below)
+ * Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL
+ * YAML DSL support for most Storm components (storm-kafka, storm-hdfs, storm-hbase, etc.)
+ * Convenient support for multi-lang components
+ * External property substitution/filtering for easily switching between configurations/environments (similar to Maven-style
+ `${variable.name}` substitution)
+
+## Usage
+
+To use Flux, add it as a dependency and package all your Storm components in a fat jar, then create a YAML document
+to define your topology (see below for YAML configuration options).
+
+### Building from Source
+The easiest way to use Flux, is to add it as a Maven dependency in you project as described below.
+
+If you would like to build Flux from source and run the unit/integration tests, you will need the following installed
+on your system:
+
+* Python 2.6.x or later
+* Node.js 0.10.x or later
+
+#### Building with unit tests enabled:
+
+```
+mvn clean install
+```
+
+#### Building with unit tests disabled:
+If you would like to build Flux without installing Python or Node.js you can simply skip the unit tests:
+
+```
+mvn clean install -DskipTests=true
+```
+
+Note that if you plan on using Flux to deploy topologies to a remote cluster, you will still need to have Python
+installed since it is required by Apache Storm.
+
+
+#### Building with integration tests enabled:
+
+```
+mvn clean install -DskipIntegration=false
+```
+
+
+### Packaging with Maven
+To enable Flux for your Storm components, you need to add it as a dependency such that it's included in the Storm
+topology jar. This can be accomplished with the Maven shade plugin (preferred) or the Maven assembly plugin (not
+recommended).
+
+#### Flux Maven Dependency
+The current version of Flux is available in Maven Central at the following coordinates:
+```xml
+<dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-core</artifactId>
+ <version>${storm.version}</version>
+</dependency>
+```
+
+Using shell spouts and bolts requires additional Flux Wrappers library:
+```xml
+<dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-wrappers</artifactId>
+ <version>${storm.version}</version>
+</dependency>
+```
+
+#### Creating a Flux-Enabled Topology JAR
+The example below illustrates Flux usage with the Maven shade plugin:
+
+ ```xml
+<!-- include Flux and user dependencies in the shaded jar -->
+<dependencies>
+ <!-- Flux include -->
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-core</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
+ <!-- Flux Wrappers include -->
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-wrappers</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
+
+ <!-- add user dependencies here... -->
+
+</dependencies>
+<!-- create a fat jar that includes all dependencies -->
+<build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.4</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.storm.flux.Flux</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+</build>
+ ```
+
+### Deploying and Running a Flux Topology
+Once your topology components are packaged with the Flux dependency, you can run different topologies either locally
+or remotely using the `storm jar` command. For example, if your fat jar is named `myTopology-0.1.0-SNAPSHOT.jar` you
+could run it locally with the command:
+
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml
+
+```
+
+### Command line options
+```
+usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux
+ [options] <topology-config.yaml>
+ -d,--dry-run Do not run or deploy the topology. Just
+ build, validate, and print information about
+ the topology.
+ -e,--env-filter Perform environment variable substitution.
+ Replace keys identified with `${ENV-[NAME]}`
+ will be replaced with the corresponding
+ `NAME` environment value
+ -f,--filter <file> Perform property substitution. Use the
+ specified file as a source of properties,
+ and replace keys identified with {$[property
+ name]} with the value defined in the
+ properties file.
+ -i,--inactive Deploy the topology, but do not activate it.
+ -l,--local Run the topology in local mode.
+ -n,--no-splash Suppress the printing of the splash screen.
+ -q,--no-detail Suppress the printing of topology details.
+ -r,--remote Deploy the topology to a remote cluster.
+ -R,--resource Treat the supplied path as a classpath
+ resource instead of a file.
+ -s,--sleep <ms> When running locally, the amount of time to
+ sleep (in ms.) before killing the topology
+ and shutting down the local cluster.
+ -z,--zookeeper <host:port> When running in local mode, use the
+ ZooKeeper at the specified <host>:<port>
+ instead of the in-process ZooKeeper.
+ (requires Storm 0.9.3 or later)
+```
+
+**NOTE:** Flux tries to avoid command line switch collision with the `storm` command, and allows any other command line
+switches to pass through to the `storm` command.
+
+For example, you can use the `storm` command switch `-c` to override a topology configuration property. The following
+example command will run Flux and override the `nimus.host` configuration:
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote my_config.yaml -c nimbus.host=localhost
+```
+
+### Sample output
+```
+\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2557 \u2588\u2588\u2557 \u2588\u2588\u2557\u2588\u2588\u2557 \u2588\u2588\u2557
+\u2588\u2588\u2554\u2550\u2550\u2550\u2550\u255d\u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551\u255a\u2588\u2588\u2557\u2588\u2588\u2554\u255d
+\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551 \u255a\u2588\u2588\u2588\u2554\u255d
+\u2588\u2588\u2554\u2550\u2550\u255d \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2554\u2588\u2588\u2557
+\u2588\u2588\u2551 \u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u255a\u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d\u2588\u2588\u2554\u255d \u2588\u2588\u2557
+\u255a\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u255d
++- Apache Storm -+
++- data FLow User eXperience -+
+Version: 0.3.0
+Parsing file: /Users/hsimpson/Projects/donut_domination/storm/shell_test.yaml
+---------- TOPOLOGY DETAILS ----------
+Name: shell-topology
+--------------- SPOUTS ---------------
+sentence-spout[1](org.apache.storm.flux.wrappers.spouts.FluxShellSpout)
+---------------- BOLTS ---------------
+splitsentence[1](org.apache.storm.flux.wrappers.bolts.FluxShellBolt)
+log[1](org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
+count[1](org.apache.storm.testing.TestWordCounter)
+--------------- STREAMS ---------------
+sentence-spout --SHUFFLE--> splitsentence
+splitsentence --FIELDS--> count
+count --SHUFFLE--> log
+--------------------------------------
+Submitting topology: 'shell-topology' to remote cluster...
+```
+
+## YAML Configuration
+Flux topologies are defined in a YAML file that describes a topology. A Flux topology
+definition consists of the following:
+
+ 1. A topology name
+ 2. A list of topology "components" (named Java objects that will be made available in the environment)
+ 3. **EITHER** (A DSL topology definition):
+ * A list of spouts, each identified by a unique ID
+ * A list of bolts, each identified by a unique ID
+ * A list of "stream" objects representing a flow of tuples between spouts and bolts
+ 4. **OR** (A JVM class that can produce a `org.apache.storm.generated.StormTopology` instance:
+ * A `topologySource` definition.
+
+
+
+For example, here is a simple definition of a wordcount topology using the YAML DSL:
+
+```yaml
+name: "yaml-topology"
+config:
+ topology.workers: 1
+
+# spout definitions
+spouts:
+ - id: "spout-1"
+ className: "org.apache.storm.testing.TestWordSpout"
+ parallelism: 1
+
+# bolt definitions
+bolts:
+ - id: "bolt-1"
+ className: "org.apache.storm.testing.TestWordCounter"
+ parallelism: 1
+ - id: "bolt-2"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+#stream definitions
+streams:
+ - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
+ from: "spout-1"
+ to: "bolt-1"
+ grouping:
+ type: FIELDS
+ args: ["word"]
+
+ - name: "bolt-1 --> bolt2"
+ from: "bolt-1"
+ to: "bolt-2"
+ grouping:
+ type: SHUFFLE
+
+
+```
+## Property Substitution/Filtering
+It's common for developers to want to easily switch between configurations, for example switching deployment between
+a development environment and a production environment. This can be accomplished by using separate YAML configuration
+files, but that approach would lead to unnecessary duplication, especially in situations where the Storm topology
+does not change, but configuration settings such as host names, ports, and parallelism paramters do.
+
+For this case, Flux offers properties filtering to allow you two externalize values to a `.properties` file and have
+them substituted before the `.yaml` file is parsed.
+
+To enable property filtering, use the `--filter` command line option and specify a `.properties` file. For example,
+if you invoked flux like so:
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml --filter dev.properties
+```
+With the following `dev.properties` file:
+
+```properties
+kafka.zookeeper.hosts: localhost:2181
+```
+
+You would then be able to reference those properties by key in your `.yaml` file using `${}` syntax:
+
+```yaml
+ - id: "zkHosts"
+ className: "org.apache.storm.kafka.ZkHosts"
+ constructorArgs:
+ - "${kafka.zookeeper.hosts}"
+```
+
+In this case, Flux would replace `${kafka.zookeeper.hosts}` with `localhost:2181` before parsing the YAML contents.
+
+### Environment Variable Substitution/Filtering
+Flux also allows environment variable substitution. For example, if an environment variable named `ZK_HOSTS` if defined,
+you can reference it in a Flux YAML file with the following syntax:
+
+```
+${ENV-ZK_HOSTS}
+```
+
+## Components
+Components are essentially named object instances that are made available as configuration options for spouts and
+bolts. If you are familiar with the Spring framework, components are roughly analagous to Spring beans.
+
+Every component is identified, at a minimum, by a unique identifier (String) and a class name (String). For example,
+the following will make an instance of the `org.apache.storm.kafka.StringScheme` class available as a reference under the key
+`"stringScheme"` . This assumes the `org.apache.storm.kafka.StringScheme` has a default constructor.
+
+```yaml
+components:
+ - id: "stringScheme"
+ className: "org.apache.storm.kafka.StringScheme"
+```
+
+### Contructor Arguments, References, Properties and Configuration Methods
+
+####Constructor Arguments
+Arguments to a class constructor can be configured by adding a `contructorArgs` element to a components.
+`constructorArgs` is a list of objects that will be passed to the class' constructor. The following example creates an
+object by calling the constructor that takes a single string as an argument:
+
+```yaml
+ - id: "zkHosts"
+ className: "org.apache.storm.kafka.ZkHosts"
+ constructorArgs:
+ - "localhost:2181"
+ - true
+```
+
+####References
+Each component instance is identified by a unique id that allows it to be used/reused by other components. To
+reference an existing component, you specify the id of the component with the `ref` tag.
+
+In the following example, a component with the id `"stringScheme"` is created, and later referenced, as a an argument
+to another component's constructor:
+
+```yaml
+components:
+ - id: "stringScheme"
+ className: "org.apache.storm.kafka.StringScheme"
+
+ - id: "stringMultiScheme"
+ className: "org.apache.storm.spout.SchemeAsMultiScheme"
+ constructorArgs:
+ - ref: "stringScheme" # component with id "stringScheme" must be declared above.
+```
+
+You can also reference existing components in list via specifying the id of the components with the `reflist` tag.
+The type of the reflist will be `List<Object>`, but Flux can automatically convert List to Array (also varargs),
+so you can use reflist on argument which type is `List<Type>`, or `Type[]`, or `Type...`.
+
+Please note that all components in the list must be same type.
+
+```yaml
+components:
+ - id: "boundCQLStatementMapperBuilder"
+ className: "org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder"
+ constructorArgs:
+ - "INSERT INTO sink_cassandra (eventKey, driverId, truckId, driverName) VALUES (?, ?, ?, ?)"
+ configMethods:
+ - name: "bind"
+ args:
+ - reflist: ["FieldSelector-1", "FieldSelector-2", "FieldSelector-3", "FieldSelector-4"]
+```
+
+**N.B.:** References can only be used after (below) the object they point to has been declared.
+
+####Properties
+In addition to calling constructors with different arguments, Flux also allows you to configure components using
+JavaBean-like setter methods and fields declared as `public`:
+
+```yaml
+ - id: "spoutConfig"
+ className: "org.apache.storm.kafka.SpoutConfig"
+ constructorArgs:
+ # brokerHosts
+ - ref: "zkHosts"
+ # topic
+ - "myKafkaTopic"
+ # zkRoot
+ - "/kafkaSpout"
+ # id
+ - "myId"
+ properties:
+ - name: "ignoreZkOffsets"
+ value: true
+ - name: "scheme"
+ ref: "stringMultiScheme"
+```
+
+In the example above, the `properties` declaration will cause Flux to look for a public method in the `SpoutConfig` with
+the signature `setForceFromStart(boolean b)` and attempt to invoke it. If a setter method is not found, Flux will then
+look for a public instance variable with the name `ignoreZkOffsets` and attempt to set its value.
+
+References may also be used as property values.
+
+####Configuration Methods
+Conceptually, configuration methods are similar to Properties and Constructor Args -- they allow you to invoke an
+arbitrary method on an object after it is constructed. Configuration methods are useful for working with classes that
+don't expose JavaBean methods or have constructors that can fully configure the object. Common examples include classes
+that use the builder pattern for configuration/composition.
+
+The following YAML example creates a bolt and configures it by calling several methods:
+
+```yaml
+bolts:
+ - id: "bolt-1"
+ className: "org.apache.storm.flux.test.TestBolt"
+ parallelism: 1
+ configMethods:
+ - name: "withFoo"
+ args:
+ - "foo"
+ - name: "withNone"
+ # no args needed, so no "args" line
+ - name: "withBar"
+ args:
+ - "bar"
+ - name: "withFooBar"
+ args:
+ - "foo"
+ - "bar"
+```
+
+The signatures of the corresponding methods are as follows:
+
+```java
+ public void withFoo(String foo);
+ public void withNone(); // method with zero arguments
+ public void withBar(String bar);
+ public void withFooBar(String foo, String bar);
+```
+
+Arguments passed to configuration methods work much the same way as constructor arguments, and support references as
+well.
+
+### Using Java `enum`s in Contructor Arguments, References, Properties and Configuration Methods
+You can easily use Java `enum` values as arguments in a Flux YAML file, simply by referencing the name of the `enum`.
+
+For example, [Storm's HDFS module]() includes the following `enum` definition (simplified for brevity):
+
+```java
+public static enum Units {
+ KB, MB, GB, TB
+}
+```
+
+And the `org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy` class has the following constructor:
+
+```java
+public FileSizeRotationPolicy(float count, Units units)
+
+```
+The following Flux `component` definition could be used to call the constructor:
+
+```yaml
+ - id: "rotationPolicy"
+ className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"
+ constructorArgs:
+ - 5.0
+ - MB
+```
+
+The above definition is functionally equivalent to the following Java code:
+
+```java
+// rotate files when they reach 5MB
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+```
+
+## Topology Config
+The `config` section is simply a map of Storm topology configuration parameters that will be passed to the
+`org.apache.storm.StormSubmitter` as an instance of the `org.apache.storm.Config` class:
+
+```yaml
+config:
+ topology.workers: 4
+ topology.max.spout.pending: 1000
+ topology.message.timeout.secs: 30
+```
+
+# Existing Topologies
+If you have existing Storm topologies, you can still use Flux to deploy/run/test them. This feature allows you to
+leverage Flux Constructor Arguments, References, Properties, and Topology Config declarations for existing topology
+classes.
+
+The easiest way to use an existing topology class is to define
+a `getTopology()` instance method with one of the following signatures:
+
+```java
+public StormTopology getTopology(Map<String, Object> config)
+```
+or:
+
+```java
+public StormTopology getTopology(Config config)
+```
+
+You could then use the following YAML to configure your topology:
+
+```yaml
+name: "existing-topology"
+topologySource:
+ className: "org.apache.storm.flux.test.SimpleTopology"
+```
+
+If the class you would like to use as a topology source has a different method name (i.e. not `getTopology`), you can
+override it:
+
+```yaml
+name: "existing-topology"
+topologySource:
+ className: "org.apache.storm.flux.test.SimpleTopology"
+ methodName: "getTopologyWithDifferentMethodName"
+```
+
+__N.B.:__ The specified method must accept a single argument of type `java.util.Map<String, Object>` or
+`org.apache.storm.Config`, and return a `org.apache.storm.generated.StormTopology` object.
+
+# YAML DSL
+## Spouts and Bolts
+Spout and Bolts are configured in their own respective section of the YAML configuration. Spout and Bolt definitions
+are extensions to the `component` definition that add a `parallelism` parameter that sets the parallelism for a
+component when the topology is deployed.
+
+Because spout and bolt definitions extend `component` they support constructor arguments, references, and properties as
+well.
+
+Shell spout example:
+
+```yaml
+spouts:
+ - id: "sentence-spout"
+ className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
+ # shell spout constructor takes 2 arguments: String[], String[]
+ constructorArgs:
+ # command line
+ - ["node", "randomsentence.js"]
+ # output fields
+ - ["word"]
+ parallelism: 1
+```
+
+Kafka spout example:
+
+```yaml
+components:
+ - id: "stringScheme"
+ className: "org.apache.storm.kafka.StringScheme"
+
+ - id: "stringMultiScheme"
+ className: "org.apache.storm.spout.SchemeAsMultiScheme"
+ constructorArgs:
+ - ref: "stringScheme"
+
+ - id: "zkHosts"
+ className: "org.apache.storm.kafka.ZkHosts"
+ constructorArgs:
+ - "localhost:2181"
+
+# Alternative kafka config
+# - id: "kafkaConfig"
+# className: "org.apache.storm.kafka.KafkaConfig"
+# constructorArgs:
+# # brokerHosts
+# - ref: "zkHosts"
+# # topic
+# - "myKafkaTopic"
+# # clientId (optional)
+# - "myKafkaClientId"
+
+ - id: "spoutConfig"
+ className: "org.apache.storm.kafka.SpoutConfig"
+ constructorArgs:
+ # brokerHosts
+ - ref: "zkHosts"
+ # topic
+ - "myKafkaTopic"
+ # zkRoot
+ - "/kafkaSpout"
+ # id
+ - "myId"
+ properties:
+ - name: "ignoreZkOffsets"
+ value: true
+ - name: "scheme"
+ ref: "stringMultiScheme"
+
+config:
+ topology.workers: 1
+
+# spout definitions
+spouts:
+ - id: "kafka-spout"
+ className: "org.apache.storm.kafka.KafkaSpout"
+ constructorArgs:
+ - ref: "spoutConfig"
+
+```
+
+Bolt Examples:
+
+```yaml
+# bolt definitions
+bolts:
+ - id: "splitsentence"
+ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
+ constructorArgs:
+ # command line
+ - ["python", "splitsentence.py"]
+ # output fields
+ - ["word"]
+ parallelism: 1
+ # ...
+
+ - id: "log"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+ # ...
+
+ - id: "count"
+ className: "org.apache.storm.testing.TestWordCounter"
+ parallelism: 1
+ # ...
+```
+## Streams and Stream Groupings
+Streams in Flux are represented as a list of connections (Graph edges, data flow, etc.) between the Spouts and Bolts in
+a topology, with an associated Grouping definition.
+
+A Stream definition has the following properties:
+
+**`name`:** A name for the connection (optional, currently unused)
+
+**`from`:** The `id` of a Spout or Bolt that is the source (publisher)
+
+**`to`:** The `id` of a Spout or Bolt that is the destination (subscriber)
+
+**`grouping`:** The stream grouping definition for the Stream
+
+A Grouping definition has the following properties:
+
+**`type`:** The type of grouping. One of `ALL`,`CUSTOM`,`DIRECT`,`SHUFFLE`,`LOCAL_OR_SHUFFLE`,`FIELDS`,`GLOBAL`, or `NONE`.
+
+**`streamId`:** The Storm stream ID (Optional. If unspecified will use the default stream)
+
+**`args`:** For the `FIELDS` grouping, a list of field names.
+
+**`customClass`** For the `CUSTOM` grouping, a definition of custom grouping class instance
+
+The `streams` definition example below sets up a topology with the following wiring:
+
+```
+ kafka-spout --> splitsentence --> count --> log
+```
+
+
+```yaml
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+# custom stream groupings are also supported
+
+streams:
+ - name: "kafka --> split" # name isn't used (placeholder for logging, UI, etc.)
+ from: "kafka-spout"
+ to: "splitsentence"
+ grouping:
+ type: SHUFFLE
+
+ - name: "split --> count"
+ from: "splitsentence"
+ to: "count"
+ grouping:
+ type: FIELDS
+ args: ["word"]
+
+ - name: "count --> log"
+ from: "count"
+ to: "log"
+ grouping:
+ type: SHUFFLE
+```
+
+### Custom Stream Groupings
+Custom stream groupings are defined by setting the grouping type to `CUSTOM` and defining a `customClass` parameter
+that tells Flux how to instantiate the custom class. The `customClass` definition extends `component`, so it supports
+constructor arguments, references, and properties as well.
+
+The example below creates a Stream with an instance of the `org.apache.storm.testing.NGrouping` custom stream grouping
+class.
+
+```yaml
+ - name: "bolt-1 --> bolt2"
+ from: "bolt-1"
+ to: "bolt-2"
+ grouping:
+ type: CUSTOM
+ customClass:
+ className: "org.apache.storm.testing.NGrouping"
+ constructorArgs:
+ - 1
+```
+
+## Includes and Overrides
+Flux allows you to include the contents of other YAML files, and have them treated as though they were defined in the
+same file. Includes may be either files, or classpath resources.
+
+Includes are specified as a list of maps:
+
+```yaml
+includes:
+ - resource: false
+ file: "src/test/resources/configs/shell_test.yaml"
+ override: false
+```
+
+If the `resource` property is set to `true`, the include will be loaded as a classpath resource from the value of the
+`file` attribute, otherwise it will be treated as a regular file.
+
+The `override` property controls how includes affect the values defined in the current file. If `override` is set to
+`true`, values in the included file will replace values in the current file being parsed. If `override` is set to
+`false`, values in the current file being parsed will take precedence, and the parser will refuse to replace them.
+
+**N.B.:** Includes are not yet recursive. Includes from included files will be ignored.
+
+
+## Basic Word Count Example
+
+This example uses a spout implemented in JavaScript, a bolt implemented in Python, and a bolt implemented in Java
+
+Topology YAML config:
+
+```yaml
+---
+name: "shell-topology"
+config:
+ topology.workers: 1
+
+# spout definitions
+spouts:
+ - id: "sentence-spout"
+ className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
+ # shell spout constructor takes 2 arguments: String[], String[]
+ constructorArgs:
+ # command line
+ - ["node", "randomsentence.js"]
+ # output fields
+ - ["word"]
+ parallelism: 1
+
+# bolt definitions
+bolts:
+ - id: "splitsentence"
+ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
+ constructorArgs:
+ # command line
+ - ["python", "splitsentence.py"]
+ # output fields
+ - ["word"]
+ parallelism: 1
+
+ - id: "log"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+ - id: "count"
+ className: "org.apache.storm.testing.TestWordCounter"
+ parallelism: 1
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+# custom stream groupings are also supported
+
+streams:
+ - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.)
+ from: "sentence-spout"
+ to: "splitsentence"
+ grouping:
+ type: SHUFFLE
+
+ - name: "split --> count"
+ from: "splitsentence"
+ to: "count"
+ grouping:
+ type: FIELDS
+ args: ["word"]
+
+ - name: "count --> log"
+ from: "count"
+ to: "log"
+ grouping:
+ type: SHUFFLE
+```
+
+
+## Micro-Batching (Trident) API Support
+Currenty, the Flux YAML DSL only supports the Core Storm API, but support for Storm's micro-batching API is planned.
+
+To use Flux with a Trident topology, define a topology getter method and reference it in your YAML config:
+
+```yaml
+name: "my-trident-topology"
+
+config:
+ topology.workers: 1
+
+topologySource:
+ className: "org.apache.storm.flux.test.TridentTopologySource"
+ # Flux will look for "getTopology", this will override that.
+ methodName: "getTopologyWithDifferentMethodName"
+```
+
+## Committer Sponsors
+
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/pom.xml
----------------------------------------------------------------------
diff --git a/flux/flux-core/pom.xml b/flux/flux-core/pom.xml
new file mode 100644
index 0000000..1cf4a9b
--- /dev/null
+++ b/flux/flux-core/pom.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>flux-core</artifactId>
+ <packaging>jar</packaging>
+
+ <name>flux-core</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-wrappers</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hdfs</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hbase</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.sf</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.dsa</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>META-INF/*.rsa</exclude>
+ <exclude>META-INF/*.EC</exclude>
+ <exclude>META-INF/*.ec</exclude>
+ <exclude>META-INF/MSFTSIG.SF</exclude>
+ <exclude>META-INF/MSFTSIG.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.storm.flux.Flux</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
new file mode 100644
index 0000000..222bf2d
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInitialStatus;
+import org.apache.storm.utils.Utils;
+import org.apache.commons.cli.*;
+import org.apache.storm.flux.model.*;
+import org.apache.storm.flux.parser.FluxParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+
+/**
+ * Flux entry point.
+ *
+ */
+public class Flux {
+ private static final Logger LOG = LoggerFactory.getLogger(Flux.class);
+
+ private static final Long DEFAULT_LOCAL_SLEEP_TIME = 60000l;
+
+ private static final Long DEFAULT_ZK_PORT = 2181l;
+
+ private static final String OPTION_LOCAL = "local";
+ private static final String OPTION_REMOTE = "remote";
+ private static final String OPTION_RESOURCE = "resource";
+ private static final String OPTION_SLEEP = "sleep";
+ private static final String OPTION_DRY_RUN = "dry-run";
+ private static final String OPTION_NO_DETAIL = "no-detail";
+ private static final String OPTION_NO_SPLASH = "no-splash";
+ private static final String OPTION_INACTIVE = "inactive";
+ private static final String OPTION_ZOOKEEPER = "zookeeper";
+ private static final String OPTION_FILTER = "filter";
+ private static final String OPTION_ENV_FILTER = "env-filter";
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+
+ options.addOption(option(0, "l", OPTION_LOCAL, "Run the topology in local mode."));
+
+ options.addOption(option(0, "r", OPTION_REMOTE, "Deploy the topology to a remote cluster."));
+
+ options.addOption(option(0, "R", OPTION_RESOURCE, "Treat the supplied path as a classpath resource instead of a file."));
+
+ options.addOption(option(1, "s", OPTION_SLEEP, "ms", "When running locally, the amount of time to sleep (in ms.) " +
+ "before killing the topology and shutting down the local cluster."));
+
+ options.addOption(option(0, "d", OPTION_DRY_RUN, "Do not run or deploy the topology. Just build, validate, " +
+ "and print information about the topology."));
+
+ options.addOption(option(0, "q", OPTION_NO_DETAIL, "Suppress the printing of topology details."));
+
+ options.addOption(option(0, "n", OPTION_NO_SPLASH, "Suppress the printing of the splash screen."));
+
+ options.addOption(option(0, "i", OPTION_INACTIVE, "Deploy the topology, but do not activate it."));
+
+ options.addOption(option(1, "z", OPTION_ZOOKEEPER, "host:port", "When running in local mode, use the ZooKeeper at the " +
+ "specified <host>:<port> instead of the in-process ZooKeeper. (requires Storm 0.9.3 or later)"));
+
+ options.addOption(option(1, "f", OPTION_FILTER, "file", "Perform property substitution. Use the specified file " +
+ "as a source of properties, and replace keys identified with {$[property name]} with the value defined " +
+ "in the properties file."));
+
+ options.addOption(option(0, "e", OPTION_ENV_FILTER, "Perform environment variable substitution. Replace keys" +
+ "identified with `${ENV-[NAME]}` will be replaced with the corresponding `NAME` environment value"));
+
+ CommandLineParser parser = new BasicParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ if (cmd.getArgs().length != 1) {
+ usage(options);
+ System.exit(1);
+ }
+ runCli(cmd);
+ }
+
+ private static Option option(int argCount, String shortName, String longName, String description){
+ return option(argCount, shortName, longName, longName, description);
+ }
+
+ private static Option option(int argCount, String shortName, String longName, String argName, String description){
+ Option option = OptionBuilder.hasArgs(argCount)
+ .withArgName(argName)
+ .withLongOpt(longName)
+ .withDescription(description)
+ .create(shortName);
+ return option;
+ }
+
+ private static void usage(Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("storm jar <my_topology_uber_jar.jar> " +
+ Flux.class.getName() +
+ " [options] <topology-config.yaml>", options);
+ }
+
+ private static void runCli(CommandLine cmd)throws Exception {
+ if(!cmd.hasOption(OPTION_NO_SPLASH)) {
+ printSplash();
+ }
+
+ boolean dumpYaml = cmd.hasOption("dump-yaml");
+
+ TopologyDef topologyDef = null;
+ String filePath = (String)cmd.getArgList().get(0);
+
+ // TODO conditionally load properties from a file our resource
+ String filterProps = null;
+ if(cmd.hasOption(OPTION_FILTER)){
+ filterProps = cmd.getOptionValue(OPTION_FILTER);
+ }
+
+
+ boolean envFilter = cmd.hasOption(OPTION_ENV_FILTER);
+ if(cmd.hasOption(OPTION_RESOURCE)){
+ printf("Parsing classpath resource: %s", filePath);
+ topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, filterProps, envFilter);
+ } else {
+ printf("Parsing file: %s",
+ new File(filePath).getAbsolutePath());
+ topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, filterProps, envFilter);
+ }
+
+
+ String topologyName = topologyDef.getName();
+ // merge contents of `config` into topology config
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+
+ if(!cmd.hasOption(OPTION_NO_DETAIL)){
+ printTopologyInfo(context);
+ }
+
+ if(!cmd.hasOption(OPTION_DRY_RUN)) {
+ if (cmd.hasOption(OPTION_REMOTE)) {
+ LOG.info("Running remotely...");
+ // should the topology be active or inactive
+ SubmitOptions submitOptions = null;
+ if(cmd.hasOption(OPTION_INACTIVE)){
+ LOG.info("Deploying topology in an INACTIVE state...");
+ submitOptions = new SubmitOptions(TopologyInitialStatus.INACTIVE);
+ } else {
+ LOG.info("Deploying topology in an ACTIVE state...");
+ submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
+ }
+ StormSubmitter.submitTopology(topologyName, conf, topology, submitOptions, null);
+ } else {
+ LOG.info("Running in local mode...");
+
+ String sleepStr = cmd.getOptionValue(OPTION_SLEEP);
+ Long sleepTime = DEFAULT_LOCAL_SLEEP_TIME;
+ if (sleepStr != null) {
+ sleepTime = Long.parseLong(sleepStr);
+ }
+ LOG.debug("Sleep time: {}", sleepTime);
+ LocalCluster cluster = null;
+
+ // in-process or external zookeeper
+ if(cmd.hasOption(OPTION_ZOOKEEPER)){
+ String zkStr = cmd.getOptionValue(OPTION_ZOOKEEPER);
+ LOG.info("Using ZooKeeper at '{}' instead of in-process one.", zkStr);
+ long zkPort = DEFAULT_ZK_PORT;
+ String zkHost = null;
+ if(zkStr.contains(":")){
+ String[] hostPort = zkStr.split(":");
+ zkHost = hostPort[0];
+ zkPort = hostPort.length > 1 ? Long.parseLong(hostPort[1]) : DEFAULT_ZK_PORT;
+
+ } else {
+ zkHost = zkStr;
+ }
+ // the following constructor is only available in 0.9.3 and later
+ try {
+ cluster = new LocalCluster(zkHost, zkPort);
+ } catch (NoSuchMethodError e){
+ LOG.error("The --zookeeper option can only be used with Apache Storm 0.9.3 and later.");
+ System.exit(1);
+ }
+ } else {
+ cluster = new LocalCluster();
+ }
+ try (LocalTopology topo = cluster.submitTopology(topologyName, conf, topology)) {
+ Utils.sleep(sleepTime);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+ }
+ }
+
+ static void printTopologyInfo(ExecutionContext ctx){
+ TopologyDef t = ctx.getTopologyDef();
+ if(t.isDslTopology()) {
+ print("---------- TOPOLOGY DETAILS ----------");
+
+ printf("Topology Name: %s", t.getName());
+ print("--------------- SPOUTS ---------------");
+ for (SpoutDef s : t.getSpouts()) {
+ printf("%s [%d] (%s)", s.getId(), s.getParallelism(), s.getClassName());
+ }
+ print("---------------- BOLTS ---------------");
+ for (BoltDef b : t.getBolts()) {
+ printf("%s [%d] (%s)", b.getId(), b.getParallelism(), b.getClassName());
+ }
+
+ print("--------------- STREAMS ---------------");
+ for (StreamDef sd : t.getStreams()) {
+ printf("%s --%s--> %s", sd.getFrom(), sd.getGrouping().getType(), sd.getTo());
+ }
+ print("--------------------------------------");
+ }
+ }
+
+ // save a little typing
+ private static void printf(String format, Object... args){
+ print(String.format(format, args));
+ }
+
+ private static void print(String string){
+ System.out.println(string);
+ }
+
+ private static void printSplash() throws IOException {
+ // banner
+ InputStream is = Flux.class.getResourceAsStream("/splash.txt");
+ if(is != null){
+ InputStreamReader isr = new InputStreamReader(is, "UTF-8");
+ BufferedReader br = new BufferedReader(isr);
+ String line = null;
+ while((line = br.readLine()) != null){
+ System.out.println(line);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
new file mode 100644
index 0000000..e79dfb7
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.topology.*;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.flux.model.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.*;
+import java.util.*;
+
+public class FluxBuilder {
+ private static Logger LOG = LoggerFactory.getLogger(FluxBuilder.class);
+
+ /**
+ * Given a topology definition, return a populated `org.apache.storm.Config` instance.
+ *
+ * @param topologyDef
+ * @return
+ */
+ public static Config buildConfig(TopologyDef topologyDef) {
+ // merge contents of `config` into topology config
+ Config conf = new Config();
+ conf.putAll(topologyDef.getConfig());
+ return conf;
+ }
+
+ /**
+ * Given a topology definition, return a Storm topology that can be run either locally or remotely.
+ *
+ * @param context
+ * @return
+ * @throws IllegalAccessException
+ * @throws InstantiationException
+ * @throws ClassNotFoundException
+ * @throws NoSuchMethodException
+ * @throws InvocationTargetException
+ */
+ public static StormTopology buildTopology(ExecutionContext context) throws IllegalAccessException,
+ InstantiationException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
+
+ StormTopology topology = null;
+ TopologyDef topologyDef = context.getTopologyDef();
+
+ if(!topologyDef.validate()){
+ throw new IllegalArgumentException("Invalid topology config. Spouts, bolts and streams cannot be " +
+ "defined in the same configuration as a topologySource.");
+ }
+
+ // build components that may be referenced by spouts, bolts, etc.
+ // the map will be a String --> Object where the object is a fully
+ // constructed class instance
+ buildComponents(context);
+
+ if(topologyDef.isDslTopology()) {
+ // This is a DSL (YAML, etc.) topology...
+ LOG.info("Detected DSL topology...");
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ // create spouts
+ buildSpouts(context, builder);
+
+ // we need to be able to lookup bolts by id, then switch based
+ // on whether they are IBasicBolt or IRichBolt instances
+ buildBolts(context);
+
+ // process stream definitions
+ buildStreamDefinitions(context, builder);
+
+ topology = builder.createTopology();
+ } else {
+ // user class supplied...
+ // this also provides a bridge to Trident...
+ LOG.info("A topology source has been specified...");
+ ObjectDef def = topologyDef.getTopologySource();
+ topology = buildExternalTopology(def, context);
+ }
+ return topology;
+ }
+
+ /**
+ * Given a `java.lang.Object` instance and a method name, attempt to find a method that matches the input
+ * parameter: `java.util.Map` or `org.apache.storm.Config`.
+ *
+ * @param topologySource object to inspect for the specified method
+ * @param methodName name of the method to look for
+ * @return
+ * @throws NoSuchMethodException
+ */
+ private static Method findGetTopologyMethod(Object topologySource, String methodName) throws NoSuchMethodException {
+ Class clazz = topologySource.getClass();
+ Method[] methods = clazz.getMethods();
+ ArrayList<Method> candidates = new ArrayList<Method>();
+ for(Method method : methods){
+ if(!method.getName().equals(methodName)){
+ continue;
+ }
+ if(!method.getReturnType().equals(StormTopology.class)){
+ continue;
+ }
+ Class[] paramTypes = method.getParameterTypes();
+ if(paramTypes.length != 1){
+ continue;
+ }
+ if(paramTypes[0].isAssignableFrom(Map.class) || paramTypes[0].isAssignableFrom(Config.class)){
+ candidates.add(method);
+ }
+ }
+
+ if(candidates.size() == 0){
+ throw new IllegalArgumentException("Unable to find method '" + methodName + "' method in class: " + clazz.getName());
+ } else if (candidates.size() > 1){
+ LOG.warn("Found multiple candidate methods in class '" + clazz.getName() + "'. Using the first one found");
+ }
+
+ return candidates.get(0);
+ }
+
+ /**
+ * @param context
+ * @param builder
+ */
+ private static void buildStreamDefinitions(ExecutionContext context, TopologyBuilder builder)
+ throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException,
+ IllegalAccessException, NoSuchFieldException {
+ TopologyDef topologyDef = context.getTopologyDef();
+ // process stream definitions
+ HashMap<String, BoltDeclarer> declarers = new HashMap<String, BoltDeclarer>();
+ for (StreamDef stream : topologyDef.getStreams()) {
+ Object boltObj = context.getBolt(stream.getTo());
+ BoltDeclarer declarer = declarers.get(stream.getTo());
+ if (boltObj instanceof IRichBolt) {
+ if(declarer == null) {
+ declarer = builder.setBolt(stream.getTo(),
+ (IRichBolt) boltObj,
+ topologyDef.parallelismForBolt(stream.getTo()));
+ declarers.put(stream.getTo(), declarer);
+ }
+ } else if (boltObj instanceof IBasicBolt) {
+ if(declarer == null) {
+ declarer = builder.setBolt(
+ stream.getTo(),
+ (IBasicBolt) boltObj,
+ topologyDef.parallelismForBolt(stream.getTo()));
+ declarers.put(stream.getTo(), declarer);
+ }
+ } else if (boltObj instanceof IWindowedBolt) {
+ if(declarer == null) {
+ declarer = builder.setBolt(
+ stream.getTo(),
+ (IWindowedBolt) boltObj,
+ topologyDef.parallelismForBolt(stream.getTo()));
+ declarers.put(stream.getTo(), declarer);
+ }
+ } else if (boltObj instanceof IStatefulBolt) {
+ if(declarer == null) {
+ declarer = builder.setBolt(
+ stream.getTo(),
+ (IStatefulBolt) boltObj,
+ topologyDef.parallelismForBolt(stream.getTo()));
+ declarers.put(stream.getTo(), declarer);
+ }
+ } else {
+ throw new IllegalArgumentException("Class does not appear to be a bolt: " +
+ boltObj.getClass().getName());
+ }
+
+ GroupingDef grouping = stream.getGrouping();
+ // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream
+ String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId());
+
+
+ switch (grouping.getType()) {
+ case SHUFFLE:
+ declarer.shuffleGrouping(stream.getFrom(), streamId);
+ break;
+ case FIELDS:
+ //TODO check for null grouping args
+ declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(grouping.getArgs()));
+ break;
+ case ALL:
+ declarer.allGrouping(stream.getFrom(), streamId);
+ break;
+ case DIRECT:
+ declarer.directGrouping(stream.getFrom(), streamId);
+ break;
+ case GLOBAL:
+ declarer.globalGrouping(stream.getFrom(), streamId);
+ break;
+ case LOCAL_OR_SHUFFLE:
+ declarer.localOrShuffleGrouping(stream.getFrom(), streamId);
+ break;
+ case NONE:
+ declarer.noneGrouping(stream.getFrom(), streamId);
+ break;
+ case CUSTOM:
+ declarer.customGrouping(stream.getFrom(), streamId,
+ buildCustomStreamGrouping(stream.getGrouping().getCustomClass(), context));
+ break;
+ default:
+ throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
+ }
+ }
+ }
+
+ private static void applyProperties(ObjectDef bean, Object instance, ExecutionContext context) throws
+ IllegalAccessException, InvocationTargetException, NoSuchFieldException {
+ List<PropertyDef> props = bean.getProperties();
+ Class clazz = instance.getClass();
+ if (props != null) {
+ for (PropertyDef prop : props) {
+ Object value = prop.isReference() ? context.getComponent(prop.getRef()) : prop.getValue();
+ Method setter = findSetter(clazz, prop.getName(), value);
+ if (setter != null) {
+ LOG.debug("found setter, attempting to invoke");
+ // invoke setter
+ setter.invoke(instance, new Object[]{value});
+ } else {
+ // look for a public instance variable
+ LOG.debug("no setter found. Looking for a public instance variable...");
+ Field field = findPublicField(clazz, prop.getName(), value);
+ if (field != null) {
+ field.set(instance, value);
+ }
+ }
+ }
+ }
+ }
+
+ private static Field findPublicField(Class clazz, String property, Object arg) throws NoSuchFieldException {
+ Field field = clazz.getField(property);
+ return field;
+ }
+
+ private static Method findSetter(Class clazz, String property, Object arg) {
+ String setterName = toSetterName(property);
+ Method retval = null;
+ Method[] methods = clazz.getMethods();
+ for (Method method : methods) {
+ if (setterName.equals(method.getName())) {
+ LOG.debug("Found setter method: " + method.getName());
+ retval = method;
+ }
+ }
+ return retval;
+ }
+
+ private static String toSetterName(String name) {
+ return "set" + name.substring(0, 1).toUpperCase() + name.substring(1, name.length());
+ }
+
+ private static List<Object> resolveReferences(List<Object> args, ExecutionContext context) {
+ LOG.debug("Checking arguments for references.");
+ List<Object> cArgs = new ArrayList<Object>();
+ // resolve references
+ for (Object arg : args) {
+ if (arg instanceof BeanReference) {
+ cArgs.add(context.getComponent(((BeanReference) arg).getId()));
+ } else if (arg instanceof BeanListReference) {
+ List<Object> components = new ArrayList<>();
+ BeanListReference ref = (BeanListReference) arg;
+ for (String id : ref.getIds()) {
+ components.add(context.getComponent(id));
+ }
+
+ LOG.debug("BeanListReference resolved as {}", components);
+ cArgs.add(components);
+ } else {
+ cArgs.add(arg);
+ }
+ }
+ return cArgs;
+ }
+
+ private static Object buildObject(ObjectDef def, ExecutionContext context) throws ClassNotFoundException,
+ IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
+ Class clazz = Class.forName(def.getClassName());
+ Object obj = null;
+ if (def.hasConstructorArgs()) {
+ LOG.debug("Found constructor arguments in definition: " + def.getConstructorArgs().getClass().getName());
+ List<Object> cArgs = def.getConstructorArgs();
+ if(def.hasReferences()){
+ cArgs = resolveReferences(cArgs, context);
+ }
+ Constructor con = findCompatibleConstructor(cArgs, clazz);
+ if (con != null) {
+ LOG.debug("Found something seemingly compatible, attempting invocation...");
+ obj = con.newInstance(getArgsWithListCoercian(cArgs, con.getParameterTypes()));
+ } else {
+ String msg = String.format("Couldn't find a suitable constructor for class '%s' with arguments '%s'.",
+ clazz.getName(),
+ cArgs);
+ throw new IllegalArgumentException(msg);
+ }
+ } else {
+ obj = clazz.newInstance();
+ }
+ applyProperties(def, obj, context);
+ invokeConfigMethods(def, obj, context);
+ return obj;
+ }
+
+ private static StormTopology buildExternalTopology(ObjectDef def, ExecutionContext context)
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException,
+ InvocationTargetException, NoSuchFieldException {
+
+ Object topologySource = buildObject(def, context);
+
+ String methodName = context.getTopologyDef().getTopologySource().getMethodName();
+ Method getTopology = findGetTopologyMethod(topologySource, methodName);
+ if(getTopology.getParameterTypes()[0].equals(Config.class)){
+ Config config = new Config();
+ config.putAll(context.getTopologyDef().getConfig());
+ return (StormTopology) getTopology.invoke(topologySource, config);
+ } else {
+ return (StormTopology) getTopology.invoke(topologySource, context.getTopologyDef().getConfig());
+ }
+ }
+
+ private static CustomStreamGrouping buildCustomStreamGrouping(ObjectDef def, ExecutionContext context)
+ throws ClassNotFoundException,
+ IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
+ Object grouping = buildObject(def, context);
+ return (CustomStreamGrouping)grouping;
+ }
+
+ /**
+ * Given a topology definition, resolve and instantiate all components found and return a map
+ * keyed by the component id.
+ */
+ private static void buildComponents(ExecutionContext context) throws ClassNotFoundException, NoSuchMethodException,
+ IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchFieldException {
+ Collection<BeanDef> cDefs = context.getTopologyDef().getComponents();
+ if (cDefs != null) {
+ for (BeanDef bean : cDefs) {
+ Object obj = buildObject(bean, context);
+ context.addComponent(bean.getId(), obj);
+ }
+ }
+ }
+
+
+ private static void buildSpouts(ExecutionContext context, TopologyBuilder builder) throws ClassNotFoundException,
+ NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchFieldException {
+ for (SpoutDef sd : context.getTopologyDef().getSpouts()) {
+ IRichSpout spout = buildSpout(sd, context);
+ builder.setSpout(sd.getId(), spout, sd.getParallelism());
+ context.addSpout(sd.getId(), spout);
+ }
+ }
+
+ /**
+ * Given a spout definition, return a Storm spout implementation by attempting to find a matching constructor
+ * in the given spout class. Perform list to array conversion as necessary.
+ */
+ private static IRichSpout buildSpout(SpoutDef def, ExecutionContext context) throws ClassNotFoundException,
+ IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
+ return (IRichSpout)buildObject(def, context);
+ }
+
+ /**
+ * Given a list of bolt definitions, build a map of Storm bolts with the bolt definition id as the key.
+ * Attempt to coerce the given constructor arguments to a matching bolt constructor as much as possible.
+ */
+ private static void buildBolts(ExecutionContext context) throws ClassNotFoundException, IllegalAccessException,
+ InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
+ for (BoltDef def : context.getTopologyDef().getBolts()) {
+ Class clazz = Class.forName(def.getClassName());
+ Object bolt = buildObject(def, context);
+ context.addBolt(def.getId(), bolt);
+ }
+ }
+
+ /**
+ * Given a list of constructor arguments, and a target class, attempt to find a suitable constructor.
+ *
+ */
+ private static Constructor findCompatibleConstructor(List<Object> args, Class target) throws NoSuchMethodException {
+ Constructor retval = null;
+ int eligibleCount = 0;
+
+ LOG.debug("Target class: {}, constructor args: {}", target.getName(), args);
+ Constructor[] cons = target.getDeclaredConstructors();
+
+ for (Constructor con : cons) {
+ Class[] paramClasses = con.getParameterTypes();
+ if (paramClasses.length == args.size()) {
+ LOG.debug("found constructor with same number of args..");
+ boolean invokable = canInvokeWithArgs(args, con.getParameterTypes());
+ if (invokable) {
+ retval = con;
+ eligibleCount++;
+ }
+ LOG.debug("** invokable --> {}", invokable);
+ } else {
+ LOG.debug("Skipping constructor with wrong number of arguments.");
+ }
+ }
+ if (eligibleCount > 1) {
+ LOG.warn("Found multiple invokable constructors for class {}, given arguments {}. Using the last one found.",
+ target, args);
+ }
+ return retval;
+ }
+
+
+ public static void invokeConfigMethods(ObjectDef bean, Object instance, ExecutionContext context)
+ throws InvocationTargetException, IllegalAccessException {
+
+ List<ConfigMethodDef> methodDefs = bean.getConfigMethods();
+ if(methodDefs == null || methodDefs.size() == 0){
+ return;
+ }
+ Class clazz = instance.getClass();
+ for(ConfigMethodDef methodDef : methodDefs){
+ List<Object> args = methodDef.getArgs();
+ if (args == null){
+ args = new ArrayList();
+ }
+ if(methodDef.hasReferences()){
+ args = resolveReferences(args, context);
+ }
+ String methodName = methodDef.getName();
+ Method method = findCompatibleMethod(args, clazz, methodName);
+ if(method != null) {
+ Object[] methodArgs = getArgsWithListCoercian(args, method.getParameterTypes());
+ method.invoke(instance, methodArgs);
+ } else {
+ String msg = String.format("Unable to find configuration method '%s' in class '%s' with arguments %s.",
+ new Object[]{methodName, clazz.getName(), args});
+ throw new IllegalArgumentException(msg);
+ }
+ }
+ }
+
+ private static Method findCompatibleMethod(List<Object> args, Class target, String methodName){
+ Method retval = null;
+ int eligibleCount = 0;
+
+ LOG.debug("Target class: {}, methodName: {}, args: {}", target.getName(), methodName, args);
+ Method[] methods = target.getMethods();
+
+ for (Method method : methods) {
+ Class[] paramClasses = method.getParameterTypes();
+ if (paramClasses.length == args.size() && method.getName().equals(methodName)) {
+ LOG.debug("found constructor with same number of args..");
+ boolean invokable = false;
+ if (args.size() == 0){
+ // it's a method with zero args
+ invokable = true;
+ } else {
+ invokable = canInvokeWithArgs(args, method.getParameterTypes());
+ }
+ if (invokable) {
+ retval = method;
+ eligibleCount++;
+ }
+ LOG.debug("** invokable --> {}", invokable);
+ } else {
+ LOG.debug("Skipping method with wrong number of arguments.");
+ }
+ }
+ if (eligibleCount > 1) {
+ LOG.warn("Found multiple invokable methods for class {}, method {}, given arguments {}. " +
+ "Using the last one found.",
+ new Object[]{target, methodName, args});
+ }
+ return retval;
+ }
+
+ /**
+ * Given a java.util.List of contructor/method arguments, and a list of parameter types, attempt to convert the
+ * list to an java.lang.Object array that can be used to invoke the constructor. If an argument needs
+ * to be coerced from a List to an Array, do so.
+ */
+ private static Object[] getArgsWithListCoercian(List<Object> args, Class[] parameterTypes) {
+// Class[] parameterTypes = constructor.getParameterTypes();
+ if (parameterTypes.length != args.size()) {
+ throw new IllegalArgumentException("Contructor parameter count does not egual argument size.");
+ }
+ Object[] constructorParams = new Object[args.size()];
+
+ // loop through the arguments, if we hit a list that has to be convered to an array,
+ // perform the conversion
+ for (int i = 0; i < args.size(); i++) {
+ Object obj = args.get(i);
+ Class paramType = parameterTypes[i];
+ Class objectType = obj.getClass();
+ LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.",
+ paramType, objectType);
+ if (paramType.equals(objectType)) {
+ LOG.debug("They are the same class.");
+ constructorParams[i] = args.get(i);
+ continue;
+ }
+ if (paramType.isAssignableFrom(objectType)) {
+ LOG.debug("Assignment is possible.");
+ constructorParams[i] = args.get(i);
+ continue;
+ }
+ if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)){
+ LOG.debug("Its a primitive boolean.");
+ Boolean bool = (Boolean)args.get(i);
+ constructorParams[i] = bool.booleanValue();
+ continue;
+ }
+ if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){
+ LOG.debug("Its a primitive number.");
+ Number num = (Number)args.get(i);
+ if(paramType == Float.TYPE){
+ constructorParams[i] = num.floatValue();
+ } else if (paramType == Double.TYPE) {
+ constructorParams[i] = num.doubleValue();
+ } else if (paramType == Long.TYPE) {
+ constructorParams[i] = num.longValue();
+ } else if (paramType == Integer.TYPE) {
+ constructorParams[i] = num.intValue();
+ } else if (paramType == Short.TYPE) {
+ constructorParams[i] = num.shortValue();
+ } else if (paramType == Byte.TYPE) {
+ constructorParams[i] = num.byteValue();
+ } else {
+ constructorParams[i] = args.get(i);
+ }
+ continue;
+ }
+
+ // enum conversion
+ if(paramType.isEnum() && objectType.equals(String.class)){
+ LOG.debug("Yes, will convert a String to enum");
+ constructorParams[i] = Enum.valueOf(paramType, (String)args.get(i));
+ continue;
+ }
+
+ // List to array conversion
+ if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
+ // TODO more collection content type checking
+ LOG.debug("Conversion appears possible...");
+ List list = (List) obj;
+ LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), list.get(0).getClass());
+
+ // create an array of the right type
+ Object newArrayObj = Array.newInstance(paramType.getComponentType(), list.size());
+ for (int j = 0; j < list.size(); j++) {
+ Array.set(newArrayObj, j, list.get(j));
+
+ }
+ constructorParams[i] = newArrayObj;
+ LOG.debug("After conversion: {}", constructorParams[i]);
+ }
+ }
+ return constructorParams;
+ }
+
+
+ /**
+ * Determine if the given constructor/method parameter types are compatible given arguments List. Consider if
+ * list coercian can make it possible.
+ *
+ * @param args
+ * @param parameterTypes
+ * @return
+ */
+ private static boolean canInvokeWithArgs(List<Object> args, Class[] parameterTypes) {
+ if (parameterTypes.length != args.size()) {
+ LOG.warn("parameter types were the wrong size");
+ return false;
+ }
+
+ for (int i = 0; i < args.size(); i++) {
+ Object obj = args.get(i);
+ if (obj == null) {
+ throw new IllegalArgumentException("argument shouldn't be null - index: " + i);
+ }
+ Class paramType = parameterTypes[i];
+ Class objectType = obj.getClass();
+ LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.",
+ paramType, objectType);
+ if (paramType.equals(objectType)) {
+ LOG.debug("Yes, they are the same class.");
+ } else if (paramType.isAssignableFrom(objectType)) {
+ LOG.debug("Yes, assignment is possible.");
+ } else if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)){
+ LOG.debug("Yes, assignment is possible.");
+ } else if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){
+ LOG.debug("Yes, assignment is possible.");
+ } else if(paramType.isEnum() && objectType.equals(String.class)){
+ LOG.debug("Yes, will convert a String to enum");
+ } else if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
+ // TODO more collection content type checking
+ LOG.debug("Assignment is possible if we convert a List to an array.");
+ LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), ((List) obj).get(0).getClass());
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean isPrimitiveNumber(Class clazz){
+ return clazz.isPrimitive() && !clazz.equals(boolean.class);
+ }
+
+ public static boolean isPrimitiveBoolean(Class clazz){
+ return clazz.isPrimitive() && clazz.equals(boolean.class);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java b/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
new file mode 100644
index 0000000..2777854
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.api;
+
+
+import org.apache.storm.generated.StormTopology;
+
+import java.util.Map;
+
+/**
+ * Marker interface for objects that can produce `StormTopology` objects.
+ *
+ * If a `topology-source` class implements the `getTopology()` method, Flux will
+ * call that method. Otherwise, it will introspect the given class and look for a
+ * similar method that produces a `StormTopology` instance.
+ *
+ * Note that it is not strictly necessary for a class to implement this interface.
+ * If a class defines a method with a similar signature, Flux should be able to find
+ * and invoke it.
+ *
+ */
+public interface TopologySource {
+ public StormTopology getTopology(Map<String, Object> config);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
new file mode 100644
index 0000000..f0247ed
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * A representation of a Java object that is uniquely identifyable, and given a className, constructor arguments,
+ * and properties, can be instantiated.
+ */
+public class BeanDef extends ObjectDef {
+ private String id;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java
new file mode 100644
index 0000000..652210c
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+import java.util.List;
+
+/**
+ * A bean list reference is a list of bean reference.
+ */
+public class BeanListReference {
+ public List<String> ids;
+
+ public BeanListReference(){}
+
+ public BeanListReference(List<String> ids){
+ this.ids = ids;
+ }
+
+ public List<String> getIds() {
+ return ids;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
new file mode 100644
index 0000000..bd236f1
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * A bean reference is simply a string pointer to another id.
+ */
+public class BeanReference {
+ public String id;
+
+ public BeanReference(){}
+
+ public BeanReference(String id){
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
new file mode 100644
index 0000000..362abf1
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * Bean representation of a Storm bolt.
+ */
+public class BoltDef extends VertexDef {
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
new file mode 100644
index 0000000..69cabc3
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConfigMethodDef {
+ private String name;
+ private List<Object> args;
+ private boolean hasReferences = false;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public List<Object> getArgs() {
+ return args;
+ }
+
+ public void setArgs(List<Object> args) {
+
+ List<Object> newVal = new ArrayList<Object>();
+ for(Object obj : args){
+ if(obj instanceof LinkedHashMap){
+ Map map = (Map)obj;
+ if(map.containsKey("ref") && map.size() == 1){
+ newVal.add(new BeanReference((String)map.get("ref")));
+ this.hasReferences = true;
+ } else if (map.containsKey("reflist") && map.size() == 1) {
+ newVal.add(new BeanListReference((List<String>) map.get("reflist")));
+ this.hasReferences = true;
+ } else {
+ newVal.add(obj);
+ }
+ } else {
+ newVal.add(obj);
+ }
+ }
+ this.args = newVal;
+ }
+
+ public boolean hasReferences(){
+ return this.hasReferences;
+ }
+}