You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:13 UTC

[10/23] storm git commit: STORM-2453 Move non-connectors into the top directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/README.md
----------------------------------------------------------------------
diff --git a/flux/README.md b/flux/README.md
new file mode 100644
index 0000000..206ae22
--- /dev/null
+++ b/flux/README.md
@@ -0,0 +1,872 @@
+# flux
+A framework for creating and deploying Apache Storm streaming computations with less friction.
+
+## Definition
+**flux** |fl\u0259ks| _noun_
+
+1. The action or process of flowing or flowing out
+2. Continuous change
+3. In physics, the rate of flow of a fluid, radiant energy, or particles across a given area
+4. A substance mixed with a solid to lower its melting point
+
+## Rationale
+Bad things happen when configuration is hard-coded. No one should have to recompile or repackage an application in
+order to change configuration.
+
+## About
+Flux is a framework and set of utilities that make defining and deploying Apache Storm topologies less painful and
+deveoper-intensive.
+
+Have you ever found yourself repeating this pattern?:
+
+```java
+
+public static void main(String[] args) throws Exception {
+    // logic to determine if we're running locally or not...
+    // create necessary config options...
+    boolean runLocal = shouldRunLocal();
+    if(runLocal){
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(name, conf, topology);
+    } else {
+        StormSubmitter.submitTopology(name, conf, topology);
+    }
+}
+```
+
+Wouldn't something like this be easier:
+
+```bash
+storm jar mytopology.jar org.apache.storm.flux.Flux --local config.yaml
+```
+
+or:
+
+```bash
+storm jar mytopology.jar org.apache.storm.flux.Flux --remote config.yaml
+```
+
+Another pain point often mentioned is the fact that the wiring for a Topology graph is often tied up in Java code,
+and that any changes require recompilation and repackaging of the topology jar file. Flux aims to alleviate that
+pain by allowing you to package all your Storm components in a single jar, and use an external text file to define
+the layout and configuration of your topologies.
+
+## Features
+
+ * Easily configure and deploy Storm topologies (Both Storm core and Microbatch API) without embedding configuration
+   in your topology code
+ * Support for existing topology code (see below)
+ * Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL
+ * YAML DSL support for most Storm components (storm-kafka, storm-hdfs, storm-hbase, etc.)
+ * Convenient support for multi-lang components
+ * External property substitution/filtering for easily switching between configurations/environments (similar to Maven-style
+   `${variable.name}` substitution)
+
+## Usage
+
+To use Flux, add it as a dependency and package all your Storm components in a fat jar, then create a YAML document
+to define your topology (see below for YAML configuration options).
+
+### Building from Source
+The easiest way to use Flux, is to add it as a Maven dependency in you project as described below.
+
+If you would like to build Flux from source and run the unit/integration tests, you will need the following installed
+on your system:
+
+* Python 2.6.x or later
+* Node.js 0.10.x or later
+
+#### Building with unit tests enabled:
+
+```
+mvn clean install
+```
+
+#### Building with unit tests disabled:
+If you would like to build Flux without installing Python or Node.js you can simply skip the unit tests:
+
+```
+mvn clean install -DskipTests=true
+```
+
+Note that if you plan on using Flux to deploy topologies to a remote cluster, you will still need to have Python
+installed since it is required by Apache Storm.
+
+
+#### Building with integration tests enabled:
+
+```
+mvn clean install -DskipIntegration=false
+```
+
+
+### Packaging with Maven
+To enable Flux for your Storm components, you need to add it as a dependency such that it's included in the Storm
+topology jar. This can be accomplished with the Maven shade plugin (preferred) or the Maven assembly plugin (not
+recommended).
+
+#### Flux Maven Dependency
+The current version of Flux is available in Maven Central at the following coordinates:
+```xml
+<dependency>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>flux-core</artifactId>
+    <version>${storm.version}</version>
+</dependency>
+```
+
+Using shell spouts and bolts requires additional Flux Wrappers library:
+```xml
+<dependency>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>flux-wrappers</artifactId>
+    <version>${storm.version}</version>
+</dependency>
+```
+
+#### Creating a Flux-Enabled Topology JAR
+The example below illustrates Flux usage with the Maven shade plugin:
+
+ ```xml
+<!-- include Flux and user dependencies in the shaded jar -->
+<dependencies>
+    <!-- Flux include -->
+    <dependency>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>flux-core</artifactId>
+        <version>${storm.version}</version>
+    </dependency>
+    <!-- Flux Wrappers include -->
+    <dependency>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>flux-wrappers</artifactId>
+        <version>${storm.version}</version>
+    </dependency>
+
+    <!-- add user dependencies here... -->
+
+</dependencies>
+<!-- create a fat jar that includes all dependencies -->
+<build>
+    <plugins>
+        <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+            <version>1.4</version>
+            <configuration>
+                <createDependencyReducedPom>true</createDependencyReducedPom>
+            </configuration>
+            <executions>
+                <execution>
+                    <phase>package</phase>
+                    <goals>
+                        <goal>shade</goal>
+                    </goals>
+                    <configuration>
+                        <transformers>
+                            <transformer
+                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                            <transformer
+                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                <mainClass>org.apache.storm.flux.Flux</mainClass>
+                            </transformer>
+                        </transformers>
+                    </configuration>
+                </execution>
+            </executions>
+        </plugin>
+    </plugins>
+</build>
+ ```
+
+### Deploying and Running a Flux Topology
+Once your topology components are packaged with the Flux dependency, you can run different topologies either locally
+or remotely using the `storm jar` command. For example, if your fat jar is named `myTopology-0.1.0-SNAPSHOT.jar` you
+could run it locally with the command:
+
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml
+
+```
+
+### Command line options
+```
+usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux
+             [options] <topology-config.yaml>
+ -d,--dry-run                 Do not run or deploy the topology. Just
+                              build, validate, and print information about
+                              the topology.
+ -e,--env-filter              Perform environment variable substitution.
+                              Replace keys identified with `${ENV-[NAME]}`
+                              will be replaced with the corresponding
+                              `NAME` environment value
+ -f,--filter <file>           Perform property substitution. Use the
+                              specified file as a source of properties,
+                              and replace keys identified with {$[property
+                              name]} with the value defined in the
+                              properties file.
+ -i,--inactive                Deploy the topology, but do not activate it.
+ -l,--local                   Run the topology in local mode.
+ -n,--no-splash               Suppress the printing of the splash screen.
+ -q,--no-detail               Suppress the printing of topology details.
+ -r,--remote                  Deploy the topology to a remote cluster.
+ -R,--resource                Treat the supplied path as a classpath
+                              resource instead of a file.
+ -s,--sleep <ms>              When running locally, the amount of time to
+                              sleep (in ms.) before killing the topology
+                              and shutting down the local cluster.
+ -z,--zookeeper <host:port>   When running in local mode, use the
+                              ZooKeeper at the specified <host>:<port>
+                              instead of the in-process ZooKeeper.
+                              (requires Storm 0.9.3 or later)
+```
+
+**NOTE:** Flux tries to avoid command line switch collision with the `storm` command, and allows any other command line
+switches to pass through to the `storm` command.
+
+For example, you can use the `storm` command switch `-c` to override a topology configuration property. The following
+example command will run Flux and override the `nimus.host` configuration:
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote my_config.yaml -c nimbus.host=localhost
+```
+
+### Sample output
+```
+\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2557     \u2588\u2588\u2557   \u2588\u2588\u2557\u2588\u2588\u2557  \u2588\u2588\u2557
+\u2588\u2588\u2554\u2550\u2550\u2550\u2550\u255d\u2588\u2588\u2551     \u2588\u2588\u2551   \u2588\u2588\u2551\u255a\u2588\u2588\u2557\u2588\u2588\u2554\u255d
+\u2588\u2588\u2588\u2588\u2588\u2557  \u2588\u2588\u2551     \u2588\u2588\u2551   \u2588\u2588\u2551 \u255a\u2588\u2588\u2588\u2554\u255d
+\u2588\u2588\u2554\u2550\u2550\u255d  \u2588\u2588\u2551     \u2588\u2588\u2551   \u2588\u2588\u2551 \u2588\u2588\u2554\u2588\u2588\u2557
+\u2588\u2588\u2551     \u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u255a\u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d\u2588\u2588\u2554\u255d \u2588\u2588\u2557
+\u255a\u2550\u255d     \u255a\u2550\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u255d  \u255a\u2550\u255d
++-         Apache Storm        -+
++-  data FLow User eXperience  -+
+Version: 0.3.0
+Parsing file: /Users/hsimpson/Projects/donut_domination/storm/shell_test.yaml
+---------- TOPOLOGY DETAILS ----------
+Name: shell-topology
+--------------- SPOUTS ---------------
+sentence-spout[1](org.apache.storm.flux.wrappers.spouts.FluxShellSpout)
+---------------- BOLTS ---------------
+splitsentence[1](org.apache.storm.flux.wrappers.bolts.FluxShellBolt)
+log[1](org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
+count[1](org.apache.storm.testing.TestWordCounter)
+--------------- STREAMS ---------------
+sentence-spout --SHUFFLE--> splitsentence
+splitsentence --FIELDS--> count
+count --SHUFFLE--> log
+--------------------------------------
+Submitting topology: 'shell-topology' to remote cluster...
+```
+
+## YAML Configuration
+Flux topologies are defined in a YAML file that describes a topology. A Flux topology
+definition consists of the following:
+
+  1. A topology name
+  2. A list of topology "components" (named Java objects that will be made available in the environment)
+  3. **EITHER** (A DSL topology definition):
+      * A list of spouts, each identified by a unique ID
+      * A list of bolts, each identified by a unique ID
+      * A list of "stream" objects representing a flow of tuples between spouts and bolts
+  4. **OR** (A JVM class that can produce a `org.apache.storm.generated.StormTopology` instance:
+      * A `topologySource` definition.
+
+
+
+For example, here is a simple definition of a wordcount topology using the YAML DSL:
+
+```yaml
+name: "yaml-topology"
+config:
+  topology.workers: 1
+
+# spout definitions
+spouts:
+  - id: "spout-1"
+    className: "org.apache.storm.testing.TestWordSpout"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+  - id: "bolt-1"
+    className: "org.apache.storm.testing.TestWordCounter"
+    parallelism: 1
+  - id: "bolt-2"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+#stream definitions
+streams:
+  - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
+    from: "spout-1"
+    to: "bolt-1"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - name: "bolt-1 --> bolt2"
+    from: "bolt-1"
+    to: "bolt-2"
+    grouping:
+      type: SHUFFLE
+
+
+```
+## Property Substitution/Filtering
+It's common for developers to want to easily switch between configurations, for example switching deployment between
+a development environment and a production environment. This can be accomplished by using separate YAML configuration
+files, but that approach would lead to unnecessary duplication, especially in situations where the Storm topology
+does not change, but configuration settings such as host names, ports, and parallelism paramters do.
+
+For this case, Flux offers properties filtering to allow you two externalize values to a `.properties` file and have
+them substituted before the `.yaml` file is parsed.
+
+To enable property filtering, use the `--filter` command line option and specify a `.properties` file. For example,
+if you invoked flux like so:
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml --filter dev.properties
+```
+With the following `dev.properties` file:
+
+```properties
+kafka.zookeeper.hosts: localhost:2181
+```
+
+You would then be able to reference those properties by key in your `.yaml` file using `${}` syntax:
+
+```yaml
+  - id: "zkHosts"
+    className: "org.apache.storm.kafka.ZkHosts"
+    constructorArgs:
+      - "${kafka.zookeeper.hosts}"
+```
+
+In this case, Flux would replace `${kafka.zookeeper.hosts}` with `localhost:2181` before parsing the YAML contents.
+
+### Environment Variable Substitution/Filtering
+Flux also allows environment variable substitution. For example, if an environment variable named `ZK_HOSTS` if defined,
+you can reference it in a Flux YAML file with the following syntax:
+
+```
+${ENV-ZK_HOSTS}
+```
+
+## Components
+Components are essentially named object instances that are made available as configuration options for spouts and
+bolts. If you are familiar with the Spring framework, components are roughly analagous to Spring beans.
+
+Every component is identified, at a minimum, by a unique identifier (String) and a class name (String). For example,
+the following will make an instance of the `org.apache.storm.kafka.StringScheme` class available as a reference under the key
+`"stringScheme"` . This assumes the `org.apache.storm.kafka.StringScheme` has a default constructor.
+
+```yaml
+components:
+  - id: "stringScheme"
+    className: "org.apache.storm.kafka.StringScheme"
+```
+
+### Contructor Arguments, References, Properties and Configuration Methods
+
+####Constructor Arguments
+Arguments to a class constructor can be configured by adding a `contructorArgs` element to a components.
+`constructorArgs` is a list of objects that will be passed to the class' constructor. The following example creates an
+object by calling the constructor that takes a single string as an argument:
+
+```yaml
+  - id: "zkHosts"
+    className: "org.apache.storm.kafka.ZkHosts"
+    constructorArgs:
+      - "localhost:2181"
+      - true
+```
+
+####References
+Each component instance is identified by a unique id that allows it to be used/reused by other components. To
+reference an existing component, you specify the id of the component with the `ref` tag.
+
+In the following example, a component with the id `"stringScheme"` is created, and later referenced, as a an argument
+to another component's constructor:
+
+```yaml
+components:
+  - id: "stringScheme"
+    className: "org.apache.storm.kafka.StringScheme"
+
+  - id: "stringMultiScheme"
+    className: "org.apache.storm.spout.SchemeAsMultiScheme"
+    constructorArgs:
+      - ref: "stringScheme" # component with id "stringScheme" must be declared above.
+```
+
+You can also reference existing components in list via specifying the id of the components with the `reflist` tag.
+The type of the reflist will be `List<Object>`, but Flux can automatically convert List to Array (also varargs), 
+so you can use reflist on argument which type is `List<Type>`, or `Type[]`, or `Type...`.
+
+Please note that all components in the list must be same type.
+
+```yaml
+components:
+  - id: "boundCQLStatementMapperBuilder"
+    className: "org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder"
+    constructorArgs:
+    - "INSERT INTO sink_cassandra (eventKey, driverId, truckId, driverName) VALUES (?, ?, ?, ?)"
+    configMethods:
+    - name: "bind"
+      args:
+      - reflist: ["FieldSelector-1", "FieldSelector-2", "FieldSelector-3", "FieldSelector-4"]
+```
+
+**N.B.:** References can only be used after (below) the object they point to has been declared.
+
+####Properties
+In addition to calling constructors with different arguments, Flux also allows you to configure components using
+JavaBean-like setter methods and fields declared as `public`:
+
+```yaml
+  - id: "spoutConfig"
+    className: "org.apache.storm.kafka.SpoutConfig"
+    constructorArgs:
+      # brokerHosts
+      - ref: "zkHosts"
+      # topic
+      - "myKafkaTopic"
+      # zkRoot
+      - "/kafkaSpout"
+      # id
+      - "myId"
+    properties:
+      - name: "ignoreZkOffsets"
+        value: true
+      - name: "scheme"
+        ref: "stringMultiScheme"
+```
+
+In the example above, the `properties` declaration will cause Flux to look for a public method in the `SpoutConfig` with
+the signature `setForceFromStart(boolean b)` and attempt to invoke it. If a setter method is not found, Flux will then
+look for a public instance variable with the name `ignoreZkOffsets` and attempt to set its value.
+
+References may also be used as property values.
+
+####Configuration Methods
+Conceptually, configuration methods are similar to Properties and Constructor Args -- they allow you to invoke an
+arbitrary method on an object after it is constructed. Configuration methods are useful for working with classes that
+don't expose JavaBean methods or have constructors that can fully configure the object. Common examples include classes
+that use the builder pattern for configuration/composition.
+
+The following YAML example creates a bolt and configures it by calling several methods:
+
+```yaml
+bolts:
+  - id: "bolt-1"
+    className: "org.apache.storm.flux.test.TestBolt"
+    parallelism: 1
+    configMethods:
+      - name: "withFoo"
+        args:
+          - "foo"
+      - name: "withNone"
+      # no args needed, so no "args" line
+      - name: "withBar"
+        args:
+          - "bar"
+      - name: "withFooBar"
+        args:
+          - "foo"
+          - "bar"
+```
+
+The signatures of the corresponding methods are as follows:
+
+```java
+    public void withFoo(String foo);
+    public void withNone(); // method with zero arguments
+    public void withBar(String bar);
+    public void withFooBar(String foo, String bar);
+```
+
+Arguments passed to configuration methods work much the same way as constructor arguments, and support references as
+well.
+
+### Using Java `enum`s in Contructor Arguments, References, Properties and Configuration Methods
+You can easily use Java `enum` values as arguments in a Flux YAML file, simply by referencing the name of the `enum`.
+
+For example, [Storm's HDFS module]() includes the following `enum` definition (simplified for brevity):
+
+```java
+public static enum Units {
+    KB, MB, GB, TB
+}
+```
+
+And the `org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy` class has the following constructor:
+
+```java
+public FileSizeRotationPolicy(float count, Units units)
+
+```
+The following Flux `component` definition could be used to call the constructor:
+
+```yaml
+  - id: "rotationPolicy"
+    className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"
+    constructorArgs:
+      - 5.0
+      - MB
+```
+
+The above definition is functionally equivalent to the following Java code:
+
+```java
+// rotate files when they reach 5MB
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+```
+
+## Topology Config
+The `config` section is simply a map of Storm topology configuration parameters that will be passed to the
+`org.apache.storm.StormSubmitter` as an instance of the `org.apache.storm.Config` class:
+
+```yaml
+config:
+  topology.workers: 4
+  topology.max.spout.pending: 1000
+  topology.message.timeout.secs: 30
+```
+
+# Existing Topologies
+If you have existing Storm topologies, you can still use Flux to deploy/run/test them. This feature allows you to
+leverage Flux Constructor Arguments, References, Properties, and Topology Config declarations for existing topology
+classes.
+
+The easiest way to use an existing topology class is to define
+a `getTopology()` instance method with one of the following signatures:
+
+```java
+public StormTopology getTopology(Map<String, Object> config)
+```
+or:
+
+```java
+public StormTopology getTopology(Config config)
+```
+
+You could then use the following YAML to configure your topology:
+
+```yaml
+name: "existing-topology"
+topologySource:
+  className: "org.apache.storm.flux.test.SimpleTopology"
+```
+
+If the class you would like to use as a topology source has a different method name (i.e. not `getTopology`), you can
+override it:
+
+```yaml
+name: "existing-topology"
+topologySource:
+  className: "org.apache.storm.flux.test.SimpleTopology"
+  methodName: "getTopologyWithDifferentMethodName"
+```
+
+__N.B.:__ The specified method must accept a single argument of type `java.util.Map<String, Object>` or
+`org.apache.storm.Config`, and return a `org.apache.storm.generated.StormTopology` object.
+
+# YAML DSL
+## Spouts and Bolts
+Spout and Bolts are configured in their own respective section of the YAML configuration. Spout and Bolt definitions
+are extensions to the `component` definition that add a `parallelism` parameter that sets the parallelism  for a
+component when the topology is deployed.
+
+Because spout and bolt definitions extend `component` they support constructor arguments, references, and properties as
+well.
+
+Shell spout example:
+
+```yaml
+spouts:
+  - id: "sentence-spout"
+    className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
+    # shell spout constructor takes 2 arguments: String[], String[]
+    constructorArgs:
+      # command line
+      - ["node", "randomsentence.js"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+```
+
+Kafka spout example:
+
+```yaml
+components:
+  - id: "stringScheme"
+    className: "org.apache.storm.kafka.StringScheme"
+
+  - id: "stringMultiScheme"
+    className: "org.apache.storm.spout.SchemeAsMultiScheme"
+    constructorArgs:
+      - ref: "stringScheme"
+
+  - id: "zkHosts"
+    className: "org.apache.storm.kafka.ZkHosts"
+    constructorArgs:
+      - "localhost:2181"
+
+# Alternative kafka config
+#  - id: "kafkaConfig"
+#    className: "org.apache.storm.kafka.KafkaConfig"
+#    constructorArgs:
+#      # brokerHosts
+#      - ref: "zkHosts"
+#      # topic
+#      - "myKafkaTopic"
+#      # clientId (optional)
+#      - "myKafkaClientId"
+
+  - id: "spoutConfig"
+    className: "org.apache.storm.kafka.SpoutConfig"
+    constructorArgs:
+      # brokerHosts
+      - ref: "zkHosts"
+      # topic
+      - "myKafkaTopic"
+      # zkRoot
+      - "/kafkaSpout"
+      # id
+      - "myId"
+    properties:
+      - name: "ignoreZkOffsets"
+        value: true
+      - name: "scheme"
+        ref: "stringMultiScheme"
+
+config:
+  topology.workers: 1
+
+# spout definitions
+spouts:
+  - id: "kafka-spout"
+    className: "org.apache.storm.kafka.KafkaSpout"
+    constructorArgs:
+      - ref: "spoutConfig"
+
+```
+
+Bolt Examples:
+
+```yaml
+# bolt definitions
+bolts:
+  - id: "splitsentence"
+    className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
+    constructorArgs:
+      # command line
+      - ["python", "splitsentence.py"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+    # ...
+
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+    # ...
+
+  - id: "count"
+    className: "org.apache.storm.testing.TestWordCounter"
+    parallelism: 1
+    # ...
+```
+## Streams and Stream Groupings
+Streams in Flux are represented as a list of connections (Graph edges, data flow, etc.) between the Spouts and Bolts in
+a topology, with an associated Grouping definition.
+
+A Stream definition has the following properties:
+
+**`name`:** A name for the connection (optional, currently unused)
+
+**`from`:** The `id` of a Spout or Bolt that is the source (publisher)
+
+**`to`:** The `id` of a Spout or Bolt that is the destination (subscriber)
+
+**`grouping`:** The stream grouping definition for the Stream
+
+A Grouping definition has the following properties:
+
+**`type`:** The type of grouping. One of `ALL`,`CUSTOM`,`DIRECT`,`SHUFFLE`,`LOCAL_OR_SHUFFLE`,`FIELDS`,`GLOBAL`, or `NONE`.
+
+**`streamId`:** The Storm stream ID (Optional. If unspecified will use the default stream)
+
+**`args`:** For the `FIELDS` grouping, a list of field names.
+
+**`customClass`** For the `CUSTOM` grouping, a definition of custom grouping class instance
+
+The `streams` definition example below sets up a topology with the following wiring:
+
+```
+    kafka-spout --> splitsentence --> count --> log
+```
+
+
+```yaml
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+# custom stream groupings are also supported
+
+streams:
+  - name: "kafka --> split" # name isn't used (placeholder for logging, UI, etc.)
+    from: "kafka-spout"
+    to: "splitsentence"
+    grouping:
+      type: SHUFFLE
+
+  - name: "split --> count"
+    from: "splitsentence"
+    to: "count"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - name: "count --> log"
+    from: "count"
+    to: "log"
+    grouping:
+      type: SHUFFLE
+```
+
+### Custom Stream Groupings
+Custom stream groupings are defined by setting the grouping type to `CUSTOM` and defining a `customClass` parameter
+that tells Flux how to instantiate the custom class. The `customClass` definition extends `component`, so it supports
+constructor arguments, references, and properties as well.
+
+The example below creates a Stream with an instance of the `org.apache.storm.testing.NGrouping` custom stream grouping
+class.
+
+```yaml
+  - name: "bolt-1 --> bolt2"
+    from: "bolt-1"
+    to: "bolt-2"
+    grouping:
+      type: CUSTOM
+      customClass:
+        className: "org.apache.storm.testing.NGrouping"
+        constructorArgs:
+          - 1
+```
+
+## Includes and Overrides
+Flux allows you to include the contents of other YAML files, and have them treated as though they were defined in the
+same file. Includes may be either files, or classpath resources.
+
+Includes are specified as a list of maps:
+
+```yaml
+includes:
+  - resource: false
+    file: "src/test/resources/configs/shell_test.yaml"
+    override: false
+```
+
+If the `resource` property is set to `true`, the include will be loaded as a classpath resource from the value of the
+`file` attribute, otherwise it will be treated as a regular file.
+
+The `override` property controls how includes affect the values defined in the current file. If `override` is set to
+`true`, values in the included file will replace values in the current file being parsed. If `override` is set to
+`false`, values in the current file being parsed will take precedence, and the parser will refuse to replace them.
+
+**N.B.:** Includes are not yet recursive. Includes from included files will be ignored.
+
+
+## Basic Word Count Example
+
+This example uses a spout implemented in JavaScript, a bolt implemented in Python, and a bolt implemented in Java
+
+Topology YAML config:
+
+```yaml
+---
+name: "shell-topology"
+config:
+  topology.workers: 1
+
+# spout definitions
+spouts:
+  - id: "sentence-spout"
+    className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
+    # shell spout constructor takes 2 arguments: String[], String[]
+    constructorArgs:
+      # command line
+      - ["node", "randomsentence.js"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+
+# bolt definitions
+bolts:
+  - id: "splitsentence"
+    className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
+    constructorArgs:
+      # command line
+      - ["python", "splitsentence.py"]
+      # output fields
+      - ["word"]
+    parallelism: 1
+
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+  - id: "count"
+    className: "org.apache.storm.testing.TestWordCounter"
+    parallelism: 1
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+# custom stream groupings are also supported
+
+streams:
+  - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.)
+    from: "sentence-spout"
+    to: "splitsentence"
+    grouping:
+      type: SHUFFLE
+
+  - name: "split --> count"
+    from: "splitsentence"
+    to: "count"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - name: "count --> log"
+    from: "count"
+    to: "log"
+    grouping:
+      type: SHUFFLE
+```
+
+
+## Micro-Batching (Trident) API Support
+Currenty, the Flux YAML DSL only supports the Core Storm API, but support for Storm's micro-batching API is planned.
+
+To use Flux with a Trident topology, define a topology getter method and reference it in your YAML config:
+
+```yaml
+name: "my-trident-topology"
+
+config:
+  topology.workers: 1
+
+topologySource:
+  className: "org.apache.storm.flux.test.TridentTopologySource"
+  # Flux will look for "getTopology", this will override that.
+  methodName: "getTopologyWithDifferentMethodName"
+```
+
+## Committer Sponsors
+
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/pom.xml
----------------------------------------------------------------------
diff --git a/flux/flux-core/pom.xml b/flux/flux-core/pom.xml
new file mode 100644
index 0000000..1cf4a9b
--- /dev/null
+++ b/flux/flux-core/pom.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>flux</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>flux-core</artifactId>
+    <packaging>jar</packaging>
+
+    <name>flux-core</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>flux-wrappers</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hdfs</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hbase</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+        </dependency>
+    </dependencies>
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+        <plugins>
+        <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+            <configuration>
+                <createDependencyReducedPom>true</createDependencyReducedPom>
+                <filters>
+                    <filter>
+                        <artifact>*:*</artifact>
+                        <excludes>
+                            <exclude>META-INF/*.SF</exclude>
+                            <exclude>META-INF/*.sf</exclude>
+                            <exclude>META-INF/*.DSA</exclude>
+                            <exclude>META-INF/*.dsa</exclude>
+                            <exclude>META-INF/*.RSA</exclude>
+                            <exclude>META-INF/*.rsa</exclude>
+                            <exclude>META-INF/*.EC</exclude>
+                            <exclude>META-INF/*.ec</exclude>
+                            <exclude>META-INF/MSFTSIG.SF</exclude>
+                            <exclude>META-INF/MSFTSIG.RSA</exclude>
+                        </excludes>
+                    </filter>
+                </filters>
+            </configuration>
+            <executions>
+                <execution>
+                    <phase>package</phase>
+                    <goals>
+                        <goal>shade</goal>
+                    </goals>
+                    <configuration>
+                        <transformers>
+                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                <mainClass>org.apache.storm.flux.Flux</mainClass>
+                            </transformer>
+                        </transformers>
+                    </configuration>
+                </execution>
+            </executions>
+        </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
new file mode 100644
index 0000000..222bf2d
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInitialStatus;
+import org.apache.storm.utils.Utils;
+import org.apache.commons.cli.*;
+import org.apache.storm.flux.model.*;
+import org.apache.storm.flux.parser.FluxParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+
+/**
+ * Flux entry point.
+ *
+ */
+public class Flux {
+    private static final Logger LOG = LoggerFactory.getLogger(Flux.class);
+
+    private static final Long DEFAULT_LOCAL_SLEEP_TIME = 60000l;
+
+    private static final Long DEFAULT_ZK_PORT = 2181l;
+
+    private static final String OPTION_LOCAL = "local";
+    private static final String OPTION_REMOTE = "remote";
+    private static final String OPTION_RESOURCE = "resource";
+    private static final String OPTION_SLEEP = "sleep";
+    private static final String OPTION_DRY_RUN = "dry-run";
+    private static final String OPTION_NO_DETAIL = "no-detail";
+    private static final String OPTION_NO_SPLASH = "no-splash";
+    private static final String OPTION_INACTIVE = "inactive";
+    private static final String OPTION_ZOOKEEPER = "zookeeper";
+    private static final String OPTION_FILTER = "filter";
+    private static final String OPTION_ENV_FILTER = "env-filter";
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+
+        options.addOption(option(0, "l", OPTION_LOCAL, "Run the topology in local mode."));
+
+        options.addOption(option(0, "r", OPTION_REMOTE, "Deploy the topology to a remote cluster."));
+
+        options.addOption(option(0, "R", OPTION_RESOURCE, "Treat the supplied path as a classpath resource instead of a file."));
+
+        options.addOption(option(1, "s", OPTION_SLEEP, "ms", "When running locally, the amount of time to sleep (in ms.) " +
+                "before killing the topology and shutting down the local cluster."));
+
+        options.addOption(option(0, "d", OPTION_DRY_RUN, "Do not run or deploy the topology. Just build, validate, " +
+                "and print information about the topology."));
+
+        options.addOption(option(0, "q", OPTION_NO_DETAIL, "Suppress the printing of topology details."));
+
+        options.addOption(option(0, "n", OPTION_NO_SPLASH, "Suppress the printing of the splash screen."));
+
+        options.addOption(option(0, "i", OPTION_INACTIVE, "Deploy the topology, but do not activate it."));
+
+        options.addOption(option(1, "z", OPTION_ZOOKEEPER, "host:port", "When running in local mode, use the ZooKeeper at the " +
+                "specified <host>:<port> instead of the in-process ZooKeeper. (requires Storm 0.9.3 or later)"));
+
+        options.addOption(option(1, "f", OPTION_FILTER, "file", "Perform property substitution. Use the specified file " +
+                "as a source of properties, and replace keys identified with {$[property name]} with the value defined " +
+                "in the properties file."));
+
+        options.addOption(option(0, "e", OPTION_ENV_FILTER, "Perform environment variable substitution. Replace keys" +
+                "identified with `${ENV-[NAME]}` will be replaced with the corresponding `NAME` environment value"));
+
+        CommandLineParser parser = new BasicParser();
+        CommandLine cmd = parser.parse(options, args);
+
+        if (cmd.getArgs().length != 1) {
+            usage(options);
+            System.exit(1);
+        }
+        runCli(cmd);
+    }
+
+    private static Option option(int argCount, String shortName, String longName, String description){
+       return option(argCount, shortName, longName, longName, description);
+    }
+
+    private static Option option(int argCount, String shortName, String longName, String argName, String description){
+        Option option = OptionBuilder.hasArgs(argCount)
+                .withArgName(argName)
+                .withLongOpt(longName)
+                .withDescription(description)
+                .create(shortName);
+        return option;
+    }
+
+    private static void usage(Options options) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("storm jar <my_topology_uber_jar.jar> " +
+                Flux.class.getName() +
+                " [options] <topology-config.yaml>", options);
+    }
+
+    private static void runCli(CommandLine cmd)throws Exception {
+        if(!cmd.hasOption(OPTION_NO_SPLASH)) {
+            printSplash();
+        }
+
+        boolean dumpYaml = cmd.hasOption("dump-yaml");
+
+        TopologyDef topologyDef = null;
+        String filePath = (String)cmd.getArgList().get(0);
+
+        // TODO conditionally load properties from a file our resource
+        String filterProps = null;
+        if(cmd.hasOption(OPTION_FILTER)){
+            filterProps = cmd.getOptionValue(OPTION_FILTER);
+        }
+
+
+        boolean envFilter = cmd.hasOption(OPTION_ENV_FILTER);
+        if(cmd.hasOption(OPTION_RESOURCE)){
+            printf("Parsing classpath resource: %s", filePath);
+            topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, filterProps, envFilter);
+        } else {
+            printf("Parsing file: %s",
+                    new File(filePath).getAbsolutePath());
+            topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, filterProps, envFilter);
+        }
+
+
+        String topologyName = topologyDef.getName();
+        // merge contents of `config` into topology config
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+
+        if(!cmd.hasOption(OPTION_NO_DETAIL)){
+            printTopologyInfo(context);
+        }
+
+        if(!cmd.hasOption(OPTION_DRY_RUN)) {
+            if (cmd.hasOption(OPTION_REMOTE)) {
+                LOG.info("Running remotely...");
+                // should the topology be active or inactive
+                SubmitOptions submitOptions = null;
+                if(cmd.hasOption(OPTION_INACTIVE)){
+                    LOG.info("Deploying topology in an INACTIVE state...");
+                    submitOptions = new SubmitOptions(TopologyInitialStatus.INACTIVE);
+                } else {
+                    LOG.info("Deploying topology in an ACTIVE state...");
+                    submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
+                }
+                StormSubmitter.submitTopology(topologyName, conf, topology, submitOptions, null);
+            } else {
+                LOG.info("Running in local mode...");
+
+                String sleepStr = cmd.getOptionValue(OPTION_SLEEP);
+                Long sleepTime = DEFAULT_LOCAL_SLEEP_TIME;
+                if (sleepStr != null) {
+                    sleepTime = Long.parseLong(sleepStr);
+                }
+                LOG.debug("Sleep time: {}", sleepTime);
+                LocalCluster cluster = null;
+
+                // in-process or external zookeeper
+                if(cmd.hasOption(OPTION_ZOOKEEPER)){
+                    String zkStr = cmd.getOptionValue(OPTION_ZOOKEEPER);
+                    LOG.info("Using ZooKeeper at '{}' instead of in-process one.", zkStr);
+                    long zkPort = DEFAULT_ZK_PORT;
+                    String zkHost = null;
+                    if(zkStr.contains(":")){
+                        String[] hostPort = zkStr.split(":");
+                        zkHost = hostPort[0];
+                        zkPort = hostPort.length > 1 ? Long.parseLong(hostPort[1]) : DEFAULT_ZK_PORT;
+
+                    } else {
+                        zkHost = zkStr;
+                    }
+                    // the following constructor is only available in 0.9.3 and later
+                    try {
+                        cluster = new LocalCluster(zkHost, zkPort);
+                    } catch (NoSuchMethodError e){
+                        LOG.error("The --zookeeper option can only be used with Apache Storm 0.9.3 and later.");
+                        System.exit(1);
+                    }
+                } else {
+                    cluster = new LocalCluster();
+                }
+                try (LocalTopology topo = cluster.submitTopology(topologyName, conf, topology)) {
+                    Utils.sleep(sleepTime);
+                } finally {
+                    cluster.shutdown();
+                }
+            }
+        }
+    }
+
+    static void printTopologyInfo(ExecutionContext ctx){
+        TopologyDef t = ctx.getTopologyDef();
+        if(t.isDslTopology()) {
+            print("---------- TOPOLOGY DETAILS ----------");
+
+            printf("Topology Name: %s", t.getName());
+            print("--------------- SPOUTS ---------------");
+            for (SpoutDef s : t.getSpouts()) {
+                printf("%s [%d] (%s)", s.getId(), s.getParallelism(), s.getClassName());
+            }
+            print("---------------- BOLTS ---------------");
+            for (BoltDef b : t.getBolts()) {
+                printf("%s [%d] (%s)", b.getId(), b.getParallelism(), b.getClassName());
+            }
+
+            print("--------------- STREAMS ---------------");
+            for (StreamDef sd : t.getStreams()) {
+                printf("%s --%s--> %s", sd.getFrom(), sd.getGrouping().getType(), sd.getTo());
+            }
+            print("--------------------------------------");
+        }
+    }
+
+    // save a little typing
+    private static void printf(String format, Object... args){
+        print(String.format(format, args));
+    }
+
+    private static void print(String string){
+        System.out.println(string);
+    }
+
+    private static void printSplash() throws IOException {
+        // banner
+        InputStream is = Flux.class.getResourceAsStream("/splash.txt");
+        if(is != null){
+            InputStreamReader isr = new InputStreamReader(is, "UTF-8");
+            BufferedReader br = new BufferedReader(isr);
+            String line = null;
+            while((line = br.readLine()) != null){
+                System.out.println(line);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
new file mode 100644
index 0000000..e79dfb7
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.topology.*;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.flux.model.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.*;
+import java.util.*;
+
+public class FluxBuilder {
+    private static Logger LOG = LoggerFactory.getLogger(FluxBuilder.class);
+
+    /**
+     * Given a topology definition, return a populated `org.apache.storm.Config` instance.
+     *
+     * @param topologyDef
+     * @return
+     */
+    public static Config buildConfig(TopologyDef topologyDef) {
+        // merge contents of `config` into topology config
+        Config conf = new Config();
+        conf.putAll(topologyDef.getConfig());
+        return conf;
+    }
+
+    /**
+     * Given a topology definition, return a Storm topology that can be run either locally or remotely.
+     *
+     * @param context
+     * @return
+     * @throws IllegalAccessException
+     * @throws InstantiationException
+     * @throws ClassNotFoundException
+     * @throws NoSuchMethodException
+     * @throws InvocationTargetException
+     */
+    public static StormTopology buildTopology(ExecutionContext context) throws IllegalAccessException,
+            InstantiationException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
+
+        StormTopology topology = null;
+        TopologyDef topologyDef = context.getTopologyDef();
+
+        if(!topologyDef.validate()){
+            throw new IllegalArgumentException("Invalid topology config. Spouts, bolts and streams cannot be " +
+                    "defined in the same configuration as a topologySource.");
+        }
+
+        // build components that may be referenced by spouts, bolts, etc.
+        // the map will be a String --> Object where the object is a fully
+        // constructed class instance
+        buildComponents(context);
+
+        if(topologyDef.isDslTopology()) {
+            // This is a DSL (YAML, etc.) topology...
+            LOG.info("Detected DSL topology...");
+
+            TopologyBuilder builder = new TopologyBuilder();
+
+            // create spouts
+            buildSpouts(context, builder);
+
+            // we need to be able to lookup bolts by id, then switch based
+            // on whether they are IBasicBolt or IRichBolt instances
+            buildBolts(context);
+
+            // process stream definitions
+            buildStreamDefinitions(context, builder);
+
+            topology = builder.createTopology();
+        } else {
+            // user class supplied...
+            // this also provides a bridge to Trident...
+            LOG.info("A topology source has been specified...");
+            ObjectDef def = topologyDef.getTopologySource();
+            topology = buildExternalTopology(def, context);
+        }
+        return topology;
+    }
+
+    /**
+     * Given a `java.lang.Object` instance and a method name, attempt to find a method that matches the input
+     * parameter: `java.util.Map` or `org.apache.storm.Config`.
+     *
+     * @param topologySource object to inspect for the specified method
+     * @param methodName name of the method to look for
+     * @return
+     * @throws NoSuchMethodException
+     */
+    private static Method findGetTopologyMethod(Object topologySource, String methodName) throws NoSuchMethodException {
+        Class clazz = topologySource.getClass();
+        Method[] methods =  clazz.getMethods();
+        ArrayList<Method> candidates = new ArrayList<Method>();
+        for(Method method : methods){
+            if(!method.getName().equals(methodName)){
+                continue;
+            }
+            if(!method.getReturnType().equals(StormTopology.class)){
+                continue;
+            }
+            Class[] paramTypes = method.getParameterTypes();
+            if(paramTypes.length != 1){
+                continue;
+            }
+            if(paramTypes[0].isAssignableFrom(Map.class) || paramTypes[0].isAssignableFrom(Config.class)){
+                candidates.add(method);
+            }
+        }
+
+        if(candidates.size() == 0){
+            throw new IllegalArgumentException("Unable to find method '" + methodName + "' method in class: " + clazz.getName());
+        } else if (candidates.size() > 1){
+            LOG.warn("Found multiple candidate methods in class '" + clazz.getName() + "'. Using the first one found");
+        }
+
+        return candidates.get(0);
+    }
+
+    /**
+     * @param context
+     * @param builder
+     */
+    private static void buildStreamDefinitions(ExecutionContext context, TopologyBuilder builder)
+            throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException,
+            IllegalAccessException, NoSuchFieldException {
+        TopologyDef topologyDef = context.getTopologyDef();
+        // process stream definitions
+        HashMap<String, BoltDeclarer> declarers = new HashMap<String, BoltDeclarer>();
+        for (StreamDef stream : topologyDef.getStreams()) {
+            Object boltObj = context.getBolt(stream.getTo());
+            BoltDeclarer declarer = declarers.get(stream.getTo());
+            if (boltObj instanceof IRichBolt) {
+                if(declarer == null) {
+                    declarer = builder.setBolt(stream.getTo(),
+                            (IRichBolt) boltObj,
+                            topologyDef.parallelismForBolt(stream.getTo()));
+                    declarers.put(stream.getTo(), declarer);
+                }
+            } else if (boltObj instanceof IBasicBolt) {
+                if(declarer == null) {
+                    declarer = builder.setBolt(
+                            stream.getTo(),
+                            (IBasicBolt) boltObj,
+                            topologyDef.parallelismForBolt(stream.getTo()));
+                    declarers.put(stream.getTo(), declarer);
+                }
+            } else if (boltObj instanceof IWindowedBolt) {
+                if(declarer == null) {
+                    declarer = builder.setBolt(
+                            stream.getTo(),
+                            (IWindowedBolt) boltObj,
+                            topologyDef.parallelismForBolt(stream.getTo()));
+                    declarers.put(stream.getTo(), declarer);
+                }
+            } else if (boltObj instanceof IStatefulBolt) {
+                if(declarer == null) {
+                    declarer = builder.setBolt(
+                            stream.getTo(),
+                            (IStatefulBolt) boltObj,
+                            topologyDef.parallelismForBolt(stream.getTo()));
+                    declarers.put(stream.getTo(), declarer);
+                }
+            } else {
+                throw new IllegalArgumentException("Class does not appear to be a bolt: " +
+                        boltObj.getClass().getName());
+            }
+
+            GroupingDef grouping = stream.getGrouping();
+            // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream
+            String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId());
+
+
+            switch (grouping.getType()) {
+                case SHUFFLE:
+                    declarer.shuffleGrouping(stream.getFrom(), streamId);
+                    break;
+                case FIELDS:
+                    //TODO check for null grouping args
+                    declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(grouping.getArgs()));
+                    break;
+                case ALL:
+                    declarer.allGrouping(stream.getFrom(), streamId);
+                    break;
+                case DIRECT:
+                    declarer.directGrouping(stream.getFrom(), streamId);
+                    break;
+                case GLOBAL:
+                    declarer.globalGrouping(stream.getFrom(), streamId);
+                    break;
+                case LOCAL_OR_SHUFFLE:
+                    declarer.localOrShuffleGrouping(stream.getFrom(), streamId);
+                    break;
+                case NONE:
+                    declarer.noneGrouping(stream.getFrom(), streamId);
+                    break;
+                case CUSTOM:
+                    declarer.customGrouping(stream.getFrom(), streamId,
+                            buildCustomStreamGrouping(stream.getGrouping().getCustomClass(), context));
+                    break;
+                default:
+                    throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
+            }
+        }
+    }
+
+    private static void applyProperties(ObjectDef bean, Object instance, ExecutionContext context) throws
+            IllegalAccessException, InvocationTargetException, NoSuchFieldException {
+        List<PropertyDef> props = bean.getProperties();
+        Class clazz = instance.getClass();
+        if (props != null) {
+            for (PropertyDef prop : props) {
+                Object value = prop.isReference() ? context.getComponent(prop.getRef()) : prop.getValue();
+                Method setter = findSetter(clazz, prop.getName(), value);
+                if (setter != null) {
+                    LOG.debug("found setter, attempting to invoke");
+                    // invoke setter
+                    setter.invoke(instance, new Object[]{value});
+                } else {
+                    // look for a public instance variable
+                    LOG.debug("no setter found. Looking for a public instance variable...");
+                    Field field = findPublicField(clazz, prop.getName(), value);
+                    if (field != null) {
+                        field.set(instance, value);
+                    }
+                }
+            }
+        }
+    }
+
+    private static Field findPublicField(Class clazz, String property, Object arg) throws NoSuchFieldException {
+        Field field = clazz.getField(property);
+        return field;
+    }
+
+    private static Method findSetter(Class clazz, String property, Object arg) {
+        String setterName = toSetterName(property);
+        Method retval = null;
+        Method[] methods = clazz.getMethods();
+        for (Method method : methods) {
+            if (setterName.equals(method.getName())) {
+                LOG.debug("Found setter method: " + method.getName());
+                retval = method;
+            }
+        }
+        return retval;
+    }
+
+    private static String toSetterName(String name) {
+        return "set" + name.substring(0, 1).toUpperCase() + name.substring(1, name.length());
+    }
+
+    private static List<Object> resolveReferences(List<Object> args, ExecutionContext context) {
+        LOG.debug("Checking arguments for references.");
+        List<Object> cArgs = new ArrayList<Object>();
+        // resolve references
+        for (Object arg : args) {
+            if (arg instanceof BeanReference) {
+                cArgs.add(context.getComponent(((BeanReference) arg).getId()));
+            } else if (arg instanceof BeanListReference) {
+                List<Object> components = new ArrayList<>();
+                BeanListReference ref = (BeanListReference) arg;
+                for (String id : ref.getIds()) {
+                    components.add(context.getComponent(id));
+                }
+
+                LOG.debug("BeanListReference resolved as {}", components);
+                cArgs.add(components);
+            } else {
+                cArgs.add(arg);
+            }
+        }
+        return cArgs;
+    }
+
+    private static Object buildObject(ObjectDef def, ExecutionContext context) throws ClassNotFoundException,
+            IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
+        Class clazz = Class.forName(def.getClassName());
+        Object obj = null;
+        if (def.hasConstructorArgs()) {
+            LOG.debug("Found constructor arguments in definition: " + def.getConstructorArgs().getClass().getName());
+            List<Object> cArgs = def.getConstructorArgs();
+            if(def.hasReferences()){
+                cArgs = resolveReferences(cArgs, context);
+            }
+            Constructor con = findCompatibleConstructor(cArgs, clazz);
+            if (con != null) {
+                LOG.debug("Found something seemingly compatible, attempting invocation...");
+                obj = con.newInstance(getArgsWithListCoercian(cArgs, con.getParameterTypes()));
+            } else {
+                String msg = String.format("Couldn't find a suitable constructor for class '%s' with arguments '%s'.",
+                        clazz.getName(),
+                        cArgs);
+                throw new IllegalArgumentException(msg);
+            }
+        } else {
+            obj = clazz.newInstance();
+        }
+        applyProperties(def, obj, context);
+        invokeConfigMethods(def, obj, context);
+        return obj;
+    }
+
+    private static StormTopology buildExternalTopology(ObjectDef def, ExecutionContext context)
+            throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException,
+            InvocationTargetException, NoSuchFieldException {
+
+        Object topologySource = buildObject(def, context);
+
+        String methodName = context.getTopologyDef().getTopologySource().getMethodName();
+        Method getTopology = findGetTopologyMethod(topologySource, methodName);
+        if(getTopology.getParameterTypes()[0].equals(Config.class)){
+            Config config = new Config();
+            config.putAll(context.getTopologyDef().getConfig());
+            return (StormTopology) getTopology.invoke(topologySource, config);
+        } else {
+            return (StormTopology) getTopology.invoke(topologySource, context.getTopologyDef().getConfig());
+        }
+    }
+
+    private static CustomStreamGrouping buildCustomStreamGrouping(ObjectDef def, ExecutionContext context)
+            throws ClassNotFoundException,
+            IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
+        Object grouping = buildObject(def, context);
+        return (CustomStreamGrouping)grouping;
+    }
+
+    /**
+     * Given a topology definition, resolve and instantiate all components found and return a map
+     * keyed by the component id.
+     */
+    private static void buildComponents(ExecutionContext context) throws ClassNotFoundException, NoSuchMethodException,
+            IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchFieldException {
+        Collection<BeanDef> cDefs = context.getTopologyDef().getComponents();
+        if (cDefs != null) {
+            for (BeanDef bean : cDefs) {
+                Object obj = buildObject(bean, context);
+                context.addComponent(bean.getId(), obj);
+            }
+        }
+    }
+
+
+    private static void buildSpouts(ExecutionContext context, TopologyBuilder builder) throws ClassNotFoundException,
+            NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchFieldException {
+        for (SpoutDef sd : context.getTopologyDef().getSpouts()) {
+            IRichSpout spout = buildSpout(sd, context);
+            builder.setSpout(sd.getId(), spout, sd.getParallelism());
+            context.addSpout(sd.getId(), spout);
+        }
+    }
+
+    /**
+     * Given a spout definition, return a Storm spout implementation by attempting to find a matching constructor
+     * in the given spout class. Perform list to array conversion as necessary.
+     */
+    private static IRichSpout buildSpout(SpoutDef def, ExecutionContext context) throws ClassNotFoundException,
+            IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
+        return (IRichSpout)buildObject(def, context);
+    }
+
+    /**
+     * Given a list of bolt definitions, build a map of Storm bolts with the bolt definition id as the key.
+     * Attempt to coerce the given constructor arguments to a matching bolt constructor as much as possible.
+     */
+    private static void buildBolts(ExecutionContext context) throws ClassNotFoundException, IllegalAccessException,
+            InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
+        for (BoltDef def : context.getTopologyDef().getBolts()) {
+            Class clazz = Class.forName(def.getClassName());
+            Object bolt = buildObject(def, context);
+            context.addBolt(def.getId(), bolt);
+        }
+    }
+
+    /**
+     * Given a list of constructor arguments, and a target class, attempt to find a suitable constructor.
+     *
+     */
+    private static Constructor findCompatibleConstructor(List<Object> args, Class target) throws NoSuchMethodException {
+        Constructor retval = null;
+        int eligibleCount = 0;
+
+        LOG.debug("Target class: {}, constructor args: {}", target.getName(), args);
+        Constructor[] cons = target.getDeclaredConstructors();
+
+        for (Constructor con : cons) {
+            Class[] paramClasses = con.getParameterTypes();
+            if (paramClasses.length == args.size()) {
+                LOG.debug("found constructor with same number of args..");
+                boolean invokable = canInvokeWithArgs(args, con.getParameterTypes());
+                if (invokable) {
+                    retval = con;
+                    eligibleCount++;
+                }
+                LOG.debug("** invokable --> {}", invokable);
+            } else {
+                LOG.debug("Skipping constructor with wrong number of arguments.");
+            }
+        }
+        if (eligibleCount > 1) {
+            LOG.warn("Found multiple invokable constructors for class {}, given arguments {}. Using the last one found.",
+                    target, args);
+        }
+        return retval;
+    }
+
+
+    public static void invokeConfigMethods(ObjectDef bean, Object instance, ExecutionContext context)
+            throws InvocationTargetException, IllegalAccessException {
+
+        List<ConfigMethodDef> methodDefs = bean.getConfigMethods();
+        if(methodDefs == null || methodDefs.size() == 0){
+            return;
+        }
+        Class clazz = instance.getClass();
+        for(ConfigMethodDef methodDef : methodDefs){
+            List<Object> args = methodDef.getArgs();
+            if (args == null){
+                args = new ArrayList();
+            }
+            if(methodDef.hasReferences()){
+                args = resolveReferences(args, context);
+            }
+            String methodName = methodDef.getName();
+            Method method = findCompatibleMethod(args, clazz, methodName);
+            if(method != null) {
+                Object[] methodArgs = getArgsWithListCoercian(args, method.getParameterTypes());
+                method.invoke(instance, methodArgs);
+            } else {
+                String msg = String.format("Unable to find configuration method '%s' in class '%s' with arguments %s.",
+                        new Object[]{methodName, clazz.getName(), args});
+                throw new IllegalArgumentException(msg);
+            }
+        }
+    }
+
+    private static Method findCompatibleMethod(List<Object> args, Class target, String methodName){
+        Method retval = null;
+        int eligibleCount = 0;
+
+        LOG.debug("Target class: {}, methodName: {}, args: {}", target.getName(), methodName, args);
+        Method[] methods = target.getMethods();
+
+        for (Method method : methods) {
+            Class[] paramClasses = method.getParameterTypes();
+            if (paramClasses.length == args.size() && method.getName().equals(methodName)) {
+                LOG.debug("found constructor with same number of args..");
+                boolean invokable = false;
+                if (args.size() == 0){
+                    // it's a method with zero args
+                    invokable = true;
+                } else {
+                    invokable = canInvokeWithArgs(args, method.getParameterTypes());
+                }
+                if (invokable) {
+                    retval = method;
+                    eligibleCount++;
+                }
+                LOG.debug("** invokable --> {}", invokable);
+            } else {
+                LOG.debug("Skipping method with wrong number of arguments.");
+            }
+        }
+        if (eligibleCount > 1) {
+            LOG.warn("Found multiple invokable methods for class {}, method {}, given arguments {}. " +
+                            "Using the last one found.",
+                            new Object[]{target, methodName, args});
+        }
+        return retval;
+    }
+
+    /**
+     * Given a java.util.List of contructor/method arguments, and a list of parameter types, attempt to convert the
+     * list to an java.lang.Object array that can be used to invoke the constructor. If an argument needs
+     * to be coerced from a List to an Array, do so.
+     */
+    private static Object[] getArgsWithListCoercian(List<Object> args, Class[] parameterTypes) {
+//        Class[] parameterTypes = constructor.getParameterTypes();
+        if (parameterTypes.length != args.size()) {
+            throw new IllegalArgumentException("Contructor parameter count does not egual argument size.");
+        }
+        Object[] constructorParams = new Object[args.size()];
+
+        // loop through the arguments, if we hit a list that has to be convered to an array,
+        // perform the conversion
+        for (int i = 0; i < args.size(); i++) {
+            Object obj = args.get(i);
+            Class paramType = parameterTypes[i];
+            Class objectType = obj.getClass();
+            LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.",
+                    paramType, objectType);
+            if (paramType.equals(objectType)) {
+                LOG.debug("They are the same class.");
+                constructorParams[i] = args.get(i);
+                continue;
+            }
+            if (paramType.isAssignableFrom(objectType)) {
+                LOG.debug("Assignment is possible.");
+                constructorParams[i] = args.get(i);
+                continue;
+            }
+            if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)){
+                LOG.debug("Its a primitive boolean.");
+                Boolean bool = (Boolean)args.get(i);
+                constructorParams[i] = bool.booleanValue();
+                continue;
+            }
+            if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){
+                LOG.debug("Its a primitive number.");
+                Number num = (Number)args.get(i);
+                if(paramType == Float.TYPE){
+                    constructorParams[i] = num.floatValue();
+                } else if (paramType == Double.TYPE) {
+                    constructorParams[i] = num.doubleValue();
+                } else if (paramType == Long.TYPE) {
+                    constructorParams[i] = num.longValue();
+                } else if (paramType == Integer.TYPE) {
+                    constructorParams[i] = num.intValue();
+                } else if (paramType == Short.TYPE) {
+                    constructorParams[i] = num.shortValue();
+                } else if (paramType == Byte.TYPE) {
+                    constructorParams[i] = num.byteValue();
+                } else {
+                    constructorParams[i] = args.get(i);
+                }
+                continue;
+            }
+
+            // enum conversion
+            if(paramType.isEnum() && objectType.equals(String.class)){
+                LOG.debug("Yes, will convert a String to enum");
+                constructorParams[i] = Enum.valueOf(paramType, (String)args.get(i));
+                continue;
+            }
+
+            // List to array conversion
+            if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
+                // TODO more collection content type checking
+                LOG.debug("Conversion appears possible...");
+                List list = (List) obj;
+                LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), list.get(0).getClass());
+
+                // create an array of the right type
+                Object newArrayObj = Array.newInstance(paramType.getComponentType(), list.size());
+                for (int j = 0; j < list.size(); j++) {
+                    Array.set(newArrayObj, j, list.get(j));
+
+                }
+                constructorParams[i] = newArrayObj;
+                LOG.debug("After conversion: {}", constructorParams[i]);
+            }
+        }
+        return constructorParams;
+    }
+
+
+    /**
+     * Determine if the given constructor/method parameter types are compatible given arguments List. Consider if
+     * list coercian can make it possible.
+     *
+     * @param args
+     * @param parameterTypes
+     * @return
+     */
+    private static boolean canInvokeWithArgs(List<Object> args, Class[] parameterTypes) {
+        if (parameterTypes.length != args.size()) {
+            LOG.warn("parameter types were the wrong size");
+            return false;
+        }
+
+        for (int i = 0; i < args.size(); i++) {
+            Object obj = args.get(i);
+            if (obj == null) {
+                throw new IllegalArgumentException("argument shouldn't be null - index: " + i);
+            }
+            Class paramType = parameterTypes[i];
+            Class objectType = obj.getClass();
+            LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.",
+                    paramType, objectType);
+            if (paramType.equals(objectType)) {
+                LOG.debug("Yes, they are the same class.");
+            } else if (paramType.isAssignableFrom(objectType)) {
+                LOG.debug("Yes, assignment is possible.");
+            } else if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)){
+                LOG.debug("Yes, assignment is possible.");
+            } else if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){
+                LOG.debug("Yes, assignment is possible.");
+            } else if(paramType.isEnum() && objectType.equals(String.class)){
+                LOG.debug("Yes, will convert a String to enum");
+            } else if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
+                // TODO more collection content type checking
+                LOG.debug("Assignment is possible if we convert a List to an array.");
+                LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), ((List) obj).get(0).getClass());
+            } else {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public static boolean isPrimitiveNumber(Class clazz){
+        return clazz.isPrimitive() && !clazz.equals(boolean.class);
+    }
+
+    public static boolean isPrimitiveBoolean(Class clazz){
+        return clazz.isPrimitive() && clazz.equals(boolean.class);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java b/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
new file mode 100644
index 0000000..2777854
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.api;
+
+
+import org.apache.storm.generated.StormTopology;
+
+import java.util.Map;
+
+/**
+ * Marker interface for objects that can produce `StormTopology` objects.
+ *
+ * If a `topology-source` class implements the `getTopology()` method, Flux will
+ * call that method. Otherwise, it will introspect the given class and look for a
+ * similar method that produces a `StormTopology` instance.
+ *
+ * Note that it is not strictly necessary for a class to implement this interface.
+ * If a class defines a method with a similar signature, Flux should be able to find
+ * and invoke it.
+ *
+ */
+public interface TopologySource {
+    public StormTopology getTopology(Map<String, Object> config);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
new file mode 100644
index 0000000..f0247ed
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * A representation of a Java object that is uniquely identifyable, and given a className, constructor arguments,
+ * and properties, can be instantiated.
+ */
+public class BeanDef extends ObjectDef {
+    private String id;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java
new file mode 100644
index 0000000..652210c
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+import java.util.List;
+
+/**
+ * A bean list reference is a list of bean reference.
+ */
+public class BeanListReference {
+    public List<String> ids;
+
+    public BeanListReference(){}
+
+    public BeanListReference(List<String> ids){
+        this.ids = ids;
+    }
+
+    public List<String> getIds() {
+        return ids;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
new file mode 100644
index 0000000..bd236f1
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * A bean reference is simply a string pointer to another id.
+ */
+public class BeanReference {
+    public String id;
+
+    public BeanReference(){}
+
+    public BeanReference(String id){
+        this.id = id;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
new file mode 100644
index 0000000..362abf1
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * Bean representation of a Storm bolt.
+ */
+public class BoltDef extends VertexDef {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
new file mode 100644
index 0000000..69cabc3
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConfigMethodDef {
+    private String name;
+    private List<Object> args;
+    private boolean hasReferences = false;
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public List<Object> getArgs() {
+        return args;
+    }
+
+    public void setArgs(List<Object> args) {
+
+        List<Object> newVal = new ArrayList<Object>();
+        for(Object obj : args){
+            if(obj instanceof LinkedHashMap){
+                Map map = (Map)obj;
+                if(map.containsKey("ref") && map.size() == 1){
+                    newVal.add(new BeanReference((String)map.get("ref")));
+                    this.hasReferences = true;
+                } else if (map.containsKey("reflist") && map.size() == 1) {
+                    newVal.add(new BeanListReference((List<String>) map.get("reflist")));
+                    this.hasReferences = true;
+                } else {
+                    newVal.add(obj);
+                }
+            } else {
+                newVal.add(obj);
+            }
+        }
+        this.args = newVal;
+    }
+
+    public boolean hasReferences(){
+        return this.hasReferences;
+    }
+}