You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:12 UTC

[09/23] storm git commit: STORM-2453 Move non-connectors into the top directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
new file mode 100644
index 0000000..1520006
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.topology.IRichSpout;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Container for all the objects required to instantiate a topology.
+ */
+public class ExecutionContext {
+    // parsed Topology definition
+    TopologyDef topologyDef;
+
+    // Storm config
+    private Config config;
+
+    // components
+    private List<Object> compontents;
+    // indexed by id
+    private Map<String, Object> componentMap = new HashMap<String, Object>();
+
+    private Map<String, IRichSpout> spoutMap = new HashMap<String, IRichSpout>();
+
+    private List<IBolt> bolts;
+    private Map<String, Object> boltMap = new HashMap<String, Object>();
+
+    public ExecutionContext(TopologyDef topologyDef, Config config){
+        this.topologyDef = topologyDef;
+        this.config = config;
+    }
+
+    public TopologyDef getTopologyDef(){
+        return this.topologyDef;
+    }
+
+    public void addSpout(String id, IRichSpout spout){
+        this.spoutMap.put(id, spout);
+    }
+
+    public void addBolt(String id, Object bolt){
+        this.boltMap.put(id, bolt);
+    }
+
+    public Object getBolt(String id){
+        return this.boltMap.get(id);
+    }
+
+    public void addComponent(String id, Object value){
+        this.componentMap.put(id, value);
+    }
+
+    public Object getComponent(String id){
+        return this.componentMap.get(id);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
new file mode 100644
index 0000000..e4fac8e
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+import java.util.List;
+
+/**
+ * Bean representation of a Storm stream grouping.
+ */
+public class GroupingDef {
+
+    /**
+     * Types of stream groupings Storm allows
+     */
+    public static enum Type {
+        ALL,
+        CUSTOM,
+        DIRECT,
+        SHUFFLE,
+        LOCAL_OR_SHUFFLE,
+        FIELDS,
+        GLOBAL,
+        NONE
+    }
+
+    private Type type;
+    private String streamId;
+    private List<String> args;
+    private ObjectDef customClass;
+
+    public List<String> getArgs() {
+        return args;
+    }
+
+    public void setArgs(List<String> args) {
+        this.args = args;
+    }
+
+    public Type getType() {
+        return type;
+    }
+
+    public void setType(Type type) {
+        this.type = type;
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+    }
+
+    public ObjectDef getCustomClass() {
+        return customClass;
+    }
+
+    public void setCustomClass(ObjectDef customClass) {
+        this.customClass = customClass;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
new file mode 100644
index 0000000..23fd9d2
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * Represents an include. Includes can be either a file or a classpath resource.
+ *
+ * If an include is marked as `override=true` then existing properties will be replaced.
+ *
+ */
+public class IncludeDef {
+    private boolean resource = false;
+    boolean override = false;
+    private String file;
+
+    public boolean isResource() {
+        return resource;
+    }
+
+    public void setResource(boolean resource) {
+        this.resource = resource;
+    }
+
+    public String getFile() {
+        return file;
+    }
+
+    public void setFile(String file) {
+        this.file = file;
+    }
+
+    public boolean isOverride() {
+        return override;
+    }
+
+    public void setOverride(boolean override) {
+        this.override = override;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
new file mode 100644
index 0000000..04a7e8a
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+import org.apache.storm.Config;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A representation of a Java object that given a className, constructor arguments,
+ * and properties, can be instantiated.
+ */
+public class ObjectDef {
+    private String className;
+    private List<Object> constructorArgs;
+    private boolean hasReferences;
+    private List<PropertyDef> properties;
+    private List<ConfigMethodDef> configMethods;
+
+    public String getClassName() {
+        return className;
+    }
+
+    public void setClassName(String className) {
+        this.className = className;
+    }
+
+    public List<Object> getConstructorArgs() {
+        return constructorArgs;
+    }
+
+    public void setConstructorArgs(List<Object> constructorArgs) {
+
+        List<Object> newVal = new ArrayList<Object>();
+        for(Object obj : constructorArgs){
+            if(obj instanceof LinkedHashMap){
+                Map map = (Map)obj;
+                if(map.containsKey("ref") && map.size() == 1) {
+                    newVal.add(new BeanReference((String) map.get("ref")));
+                    this.hasReferences = true;
+                } else if (map.containsKey("reflist") && map.size() == 1) {
+                    newVal.add(new BeanListReference((List<String>) map.get("reflist")));
+                    this.hasReferences = true;
+                } else {
+                    newVal.add(obj);
+                }
+            } else {
+                newVal.add(obj);
+            }
+        }
+        this.constructorArgs = newVal;
+    }
+
+    public boolean hasConstructorArgs(){
+        return this.constructorArgs != null && this.constructorArgs.size() > 0;
+    }
+
+    public boolean hasReferences(){
+        return this.hasReferences;
+    }
+
+    public List<PropertyDef> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(List<PropertyDef> properties) {
+        this.properties = properties;
+    }
+
+    public List<ConfigMethodDef> getConfigMethods() {
+        return configMethods;
+    }
+
+    public void setConfigMethods(List<ConfigMethodDef> configMethods) {
+        this.configMethods = configMethods;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
new file mode 100644
index 0000000..f3d7704
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+public class PropertyDef {
+    private String name;
+    private Object value;
+    private String ref;
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public Object getValue() {
+        return value;
+    }
+
+    public void setValue(Object value) {
+        if(this.ref != null){
+            throw new IllegalStateException("A property can only have a value OR a reference, not both.");
+        }
+        this.value = value;
+    }
+
+    public String getRef() {
+        return ref;
+    }
+
+    public void setRef(String ref) {
+        if(this.value != null){
+            throw new IllegalStateException("A property can only have a value OR a reference, not both.");
+        }
+        this.ref = ref;
+    }
+
+    public boolean isReference(){
+        return this.ref != null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
new file mode 100644
index 0000000..277c601
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * Bean representation of a Storm spout.
+ */
+public class SpoutDef extends VertexDef {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
new file mode 100644
index 0000000..da80f1c
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * Represents a stream of tuples from one Storm component (Spout or Bolt) to another (an edge in the topology DAG).
+ *
+ * Required fields are `from` and `to`, which define the source and destination, and the stream `grouping`.
+ *
+ */
+public class StreamDef {
+
+    private String name; // not used, placeholder for GUI, etc.
+    private String from;
+    private String to;
+    private GroupingDef grouping;
+
+    public String getTo() {
+        return to;
+    }
+
+    public void setTo(String to) {
+        this.to = to;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getFrom() {
+        return from;
+    }
+
+    public void setFrom(String from) {
+        this.from = from;
+    }
+
+    public GroupingDef getGrouping() {
+        return grouping;
+    }
+
+    public void setGrouping(GroupingDef grouping) {
+        this.grouping = grouping;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
new file mode 100644
index 0000000..86614f1
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Bean represenation of a topology.
+ *
+ * It consists of the following:
+ *   1. The topology name
+ *   2. A `java.util.Map` representing the `org.apache.storm.config` for the topology
+ *   3. A list of spout definitions
+ *   4. A list of bolt definitions
+ *   5. A list of stream definitions that define the flow between spouts and bolts.
+ *
+ */
+public class TopologyDef {
+    private static Logger LOG = LoggerFactory.getLogger(TopologyDef.class);
+
+    private String name;
+    private Map<String, BeanDef> componentMap = new LinkedHashMap<String, BeanDef>(); // not required
+    private List<IncludeDef> includes; // not required
+    private Map<String, Object> config = new HashMap<String, Object>();
+
+    // a "topology source" is a class that can produce a `StormTopology` thrift object.
+    private TopologySourceDef topologySource;
+
+    // the following are required if we're defining a core storm topology DAG in YAML, etc.
+    private Map<String, BoltDef> boltMap = new LinkedHashMap<String, BoltDef>();
+    private Map<String, SpoutDef> spoutMap = new LinkedHashMap<String, SpoutDef>();
+    private List<StreamDef> streams = new ArrayList<StreamDef>();
+
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setName(String name, boolean override){
+        if(this.name == null || override){
+            this.name = name;
+        } else {
+            LOG.warn("Ignoring attempt to set property 'name' with override == false.");
+        }
+    }
+
+    public List<SpoutDef> getSpouts() {
+        ArrayList<SpoutDef> retval = new ArrayList<SpoutDef>();
+        retval.addAll(this.spoutMap.values());
+        return retval;
+    }
+
+    public void setSpouts(List<SpoutDef> spouts) {
+        this.spoutMap = new LinkedHashMap<String, SpoutDef>();
+        for(SpoutDef spout : spouts){
+            this.spoutMap.put(spout.getId(), spout);
+        }
+    }
+
+    public List<BoltDef> getBolts() {
+        ArrayList<BoltDef> retval = new ArrayList<BoltDef>();
+        retval.addAll(this.boltMap.values());
+        return retval;
+    }
+
+    public void setBolts(List<BoltDef> bolts) {
+        this.boltMap = new LinkedHashMap<String, BoltDef>();
+        for(BoltDef bolt : bolts){
+            this.boltMap.put(bolt.getId(), bolt);
+        }
+    }
+
+    public List<StreamDef> getStreams() {
+        return streams;
+    }
+
+    public void setStreams(List<StreamDef> streams) {
+        this.streams = streams;
+    }
+
+    public Map<String, Object> getConfig() {
+        return config;
+    }
+
+    public void setConfig(Map<String, Object> config) {
+        this.config = config;
+    }
+
+    public List<BeanDef> getComponents() {
+        ArrayList<BeanDef> retval = new ArrayList<BeanDef>();
+        retval.addAll(this.componentMap.values());
+        return retval;
+    }
+
+    public void setComponents(List<BeanDef> components) {
+        this.componentMap = new LinkedHashMap<String, BeanDef>();
+        for(BeanDef component : components){
+            this.componentMap.put(component.getId(), component);
+        }
+    }
+
+    public List<IncludeDef> getIncludes() {
+        return includes;
+    }
+
+    public void setIncludes(List<IncludeDef> includes) {
+        this.includes = includes;
+    }
+
+    // utility methods
+    public int parallelismForBolt(String boltId){
+        return this.boltMap.get(boltId).getParallelism();
+    }
+
+    public BoltDef getBoltDef(String id){
+        return this.boltMap.get(id);
+    }
+
+    public SpoutDef getSpoutDef(String id){
+        return this.spoutMap.get(id);
+    }
+
+    public BeanDef getComponent(String id){
+        return this.componentMap.get(id);
+    }
+
+    // used by includes implementation
+    public void addAllBolts(List<BoltDef> bolts, boolean override){
+        for(BoltDef bolt : bolts){
+            String id = bolt.getId();
+            if(this.boltMap.get(id) == null || override) {
+                this.boltMap.put(bolt.getId(), bolt);
+            } else {
+                LOG.warn("Ignoring attempt to create bolt '{}' with override == false.", id);
+            }
+        }
+    }
+
+    public void addAllSpouts(List<SpoutDef> spouts, boolean override){
+        for(SpoutDef spout : spouts){
+            String id = spout.getId();
+            if(this.spoutMap.get(id) == null || override) {
+                this.spoutMap.put(spout.getId(), spout);
+            } else {
+                LOG.warn("Ignoring attempt to create spout '{}' with override == false.", id);
+            }
+        }
+    }
+
+    public void addAllComponents(List<BeanDef> components, boolean override) {
+        for(BeanDef bean : components){
+            String id = bean.getId();
+            if(this.componentMap.get(id) == null || override) {
+                this.componentMap.put(bean.getId(), bean);
+            } else {
+                LOG.warn("Ignoring attempt to create component '{}' with override == false.", id);
+            }
+        }
+    }
+
+    public void addAllStreams(List<StreamDef> streams, boolean override) {
+        //TODO figure out how we want to deal with overrides. Users may want to add streams even when overriding other
+        // properties. For now we just add them blindly which could lead to a potentially invalid topology.
+        this.streams.addAll(streams);
+    }
+
+    public TopologySourceDef getTopologySource() {
+        return topologySource;
+    }
+
+    public void setTopologySource(TopologySourceDef topologySource) {
+        this.topologySource = topologySource;
+    }
+
+    public boolean isDslTopology(){
+        return this.topologySource == null;
+    }
+
+
+    public boolean validate(){
+        boolean hasSpouts = this.spoutMap != null && this.spoutMap.size() > 0;
+        boolean hasBolts = this.boltMap != null && this.boltMap.size() > 0;
+        boolean hasStreams = this.streams != null && this.streams.size() > 0;
+        boolean hasSpoutsBoltsStreams = hasStreams && hasBolts && hasSpouts;
+        // you cant define a topologySource and a DSL topology at the same time...
+        if (!isDslTopology() && ((hasSpouts || hasBolts || hasStreams))) {
+            return false;
+        }
+        if(isDslTopology() && (hasSpouts && hasBolts && hasStreams)) {
+            return true;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
new file mode 100644
index 0000000..d6a2f57
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+public class TopologySourceDef extends ObjectDef {
+    public static final String DEFAULT_METHOD_NAME = "getTopology";
+
+    private String methodName;
+
+    public TopologySourceDef(){
+        this.methodName = DEFAULT_METHOD_NAME;
+    }
+
+    public String getMethodName() {
+        return methodName;
+    }
+
+    public void setMethodName(String methodName) {
+        this.methodName = methodName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
new file mode 100644
index 0000000..e71bcc2
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * Abstract parent class of component definitions
+ * (spouts/bolts)
+ */
+public abstract class VertexDef extends BeanDef {
+
+    // default parallelism to 1 so if it's ommitted, the topology will still function.
+    private int parallelism = 1;
+
+    public int getParallelism() {
+        return parallelism;
+    }
+
+    public void setParallelism(int parallelism) {
+        this.parallelism = parallelism;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
new file mode 100644
index 0000000..35904a2
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.parser;
+
+import org.apache.storm.flux.model.BoltDef;
+import org.apache.storm.flux.model.IncludeDef;
+import org.apache.storm.flux.model.SpoutDef;
+import org.apache.storm.flux.model.TopologyDef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.TypeDescription;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Properties;
+
+public class FluxParser {
+    private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class);
+
+    private FluxParser(){}
+
+    // TODO refactor input stream processing (see parseResource() method).
+    public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes,
+    	String propertiesFile, boolean envSub) throws IOException {
+   
+        FileInputStream in = new FileInputStream(inputFile);
+        TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, propertiesFile, envSub);
+        in.close();
+        
+        return topology;
+    }
+
+    public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes,
+    	String propertiesFile, boolean envSub) throws IOException {
+        
+        InputStream in = FluxParser.class.getResourceAsStream(resource);
+        TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, propertiesFile, envSub);
+        in.close();
+        
+        return topology;
+    }
+    
+    public static TopologyDef parseInputStream(InputStream inputStream, boolean dumpYaml, boolean processIncludes,
+    	String propertiesFile, boolean envSub) throws IOException {
+		
+	Yaml yaml = yaml();
+    	
+	if (inputStream == null) {
+		LOG.error("Unable to load input stream");
+		System.exit(1);
+	}
+		
+	TopologyDef topology = loadYaml(yaml, inputStream, propertiesFile, envSub);
+		
+	if (dumpYaml) {
+		dumpYaml(topology, yaml);
+	}
+	
+	if (processIncludes) {
+		return processIncludes(yaml, topology, propertiesFile, envSub);
+	} else {
+		return topology;
+	}
+    }
+
+    private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile, boolean envSubstitution) throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        LOG.info("loading YAML from input stream...");
+        int b = -1;
+        while((b = in.read()) != -1){
+            bos.write(b);
+        }
+
+        // TODO substitution implementation is not exactly efficient or kind to memory...
+        String str = bos.toString();
+        // properties file substitution
+        if(propsFile != null){
+            LOG.info("Performing property substitution.");
+            InputStream propsIn = new FileInputStream(propsFile);
+            Properties props = new Properties();
+            props.load(propsIn);
+            for(Object key : props.keySet()){
+                str = str.replace("${" + key + "}", props.getProperty((String)key));
+            }
+        } else {
+            LOG.info("Not performing property substitution.");
+        }
+
+        // environment variable substitution
+        if(envSubstitution){
+            LOG.info("Performing environment variable substitution...");
+            Map<String, String> envs = System.getenv();
+            for(String key : envs.keySet()){
+                str = str.replace("${ENV-" + key + "}", envs.get(key));
+            }
+        } else {
+            LOG.info("Not performing environment variable substitution.");
+        }
+        return (TopologyDef)yaml.load(str);
+    }
+
+    private static void dumpYaml(TopologyDef topology, Yaml yaml){
+        System.out.println("Configuration (interpreted): \n" + yaml.dump(topology));
+    }
+
+    private static Yaml yaml(){
+        Constructor constructor = new Constructor(TopologyDef.class);
+
+        TypeDescription topologyDescription = new TypeDescription(TopologyDef.class);
+        topologyDescription.putListPropertyType("spouts", SpoutDef.class);
+        topologyDescription.putListPropertyType("bolts", BoltDef.class);
+        topologyDescription.putListPropertyType("includes", IncludeDef.class);
+        constructor.addTypeDescription(topologyDescription);
+
+        Yaml  yaml = new Yaml(constructor);
+        return yaml;
+    }
+
+    /**
+     *
+     * @param yaml the yaml parser for parsing the include file(s)
+     * @param topologyDef the topology definition containing (possibly zero) includes
+     * @return The TopologyDef with includes resolved.
+     */
+    private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile, boolean envSub)
+            throws IOException {
+        //TODO support multiple levels of includes
+        if(topologyDef.getIncludes() != null) {
+            for (IncludeDef include : topologyDef.getIncludes()){
+                TopologyDef includeTopologyDef = null;
+                if (include.isResource()) {
+                    LOG.info("Loading includes from resource: {}", include.getFile());
+                    includeTopologyDef = parseResource(include.getFile(), true, false, propsFile, envSub);
+                } else {
+                    LOG.info("Loading includes from file: {}", include.getFile());
+                    includeTopologyDef = parseFile(include.getFile(), true, false, propsFile, envSub);
+                }
+
+                // if overrides are disabled, we won't replace anything that already exists
+                boolean override = include.isOverride();
+                // name
+                if(includeTopologyDef.getName() != null){
+                    topologyDef.setName(includeTopologyDef.getName(), override);
+                }
+
+                // config
+                if(includeTopologyDef.getConfig() != null) {
+                    //TODO move this logic to the model class
+                    Map<String, Object> config = topologyDef.getConfig();
+                    Map<String, Object> includeConfig = includeTopologyDef.getConfig();
+                    if(override) {
+                        config.putAll(includeTopologyDef.getConfig());
+                    } else {
+                        for(String key : includeConfig.keySet()){
+                            if(config.containsKey(key)){
+                                LOG.warn("Ignoring attempt to set topology config property '{}' with override == false", key);
+                            }
+                            else {
+                                config.put(key, includeConfig.get(key));
+                            }
+                        }
+                    }
+                }
+
+                //component overrides
+                if(includeTopologyDef.getComponents() != null){
+                    topologyDef.addAllComponents(includeTopologyDef.getComponents(), override);
+                }
+                //bolt overrides
+                if(includeTopologyDef.getBolts() != null){
+                    topologyDef.addAllBolts(includeTopologyDef.getBolts(), override);
+                }
+                //spout overrides
+                if(includeTopologyDef.getSpouts() != null) {
+                    topologyDef.addAllSpouts(includeTopologyDef.getSpouts(), override);
+                }
+                //stream overrides
+                //TODO streams should be uniquely identifiable
+                if(includeTopologyDef.getStreams() != null) {
+                    topologyDef.addAllStreams(includeTopologyDef.getStreams(), override);
+                }
+            } // end include processing
+        }
+        return topologyDef;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/resources/splash.txt
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/resources/splash.txt b/flux/flux-core/src/main/resources/splash.txt
new file mode 100644
index 0000000..337931a
--- /dev/null
+++ b/flux/flux-core/src/main/resources/splash.txt
@@ -0,0 +1,9 @@
+\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2557     \u2588\u2588\u2557   \u2588\u2588\u2557\u2588\u2588\u2557  \u2588\u2588\u2557
+\u2588\u2588\u2554\u2550\u2550\u2550\u2550\u255d\u2588\u2588\u2551     \u2588\u2588\u2551   \u2588\u2588\u2551\u255a\u2588\u2588\u2557\u2588\u2588\u2554\u255d
+\u2588\u2588\u2588\u2588\u2588\u2557  \u2588\u2588\u2551     \u2588\u2588\u2551   \u2588\u2588\u2551 \u255a\u2588\u2588\u2588\u2554\u255d
+\u2588\u2588\u2554\u2550\u2550\u255d  \u2588\u2588\u2551     \u2588\u2588\u2551   \u2588\u2588\u2551 \u2588\u2588\u2554\u2588\u2588\u2557
+\u2588\u2588\u2551     \u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u255a\u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d\u2588\u2588\u2554\u255d \u2588\u2588\u2557
+\u255a\u2550\u255d     \u255a\u2550\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u255d  \u255a\u2550\u255d
++-         Apache Storm        -+
++-  data FLow User eXperience  -+
+Version: ${project.version}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
new file mode 100644
index 0000000..ff67867
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class FluxBuilderTest {
+
+    @Test
+    public void testIsPrimitiveNumber() throws Exception {
+        assertTrue(FluxBuilder.isPrimitiveNumber(int.class));
+        assertFalse(FluxBuilder.isPrimitiveNumber(boolean.class));
+        assertFalse(FluxBuilder.isPrimitiveNumber(String.class));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
new file mode 100644
index 0000000..c5807f8
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux;
+
+import org.junit.Test;
+
+public class IntegrationTest {
+
+    private static boolean skipTest = true;
+
+    static {
+        String skipStr = System.getProperty("skipIntegration");
+        if(skipStr != null && skipStr.equalsIgnoreCase("false")){
+            skipTest = false;
+        }
+    }
+
+    @Test
+    public void testRunTopologySource() throws Exception {
+        if(!skipTest) {
+            Flux.main(new String[]{"-s", "30000", "src/test/resources/configs/existing-topology.yaml"});
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
new file mode 100644
index 0000000..c9227f6
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.flux.model.ExecutionContext;
+import org.apache.storm.flux.model.TopologyDef;
+import org.apache.storm.flux.parser.FluxParser;
+import org.apache.storm.flux.test.TestBolt;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TCKTest {
+    @Test
+    public void testTCK() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/tck.yaml", false, true, null, false);
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+    @Test
+    public void testShellComponents() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/shell_test.yaml", false, true, null, false);
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBadShellComponents() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/bad_shell_test.yaml", false, true, null, false);
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+    @Test
+    public void testKafkaSpoutConfig() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false);
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+    @Test
+    public void testLoadFromResource() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false);
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+
+    @Test
+    public void testHdfs() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/hdfs_test.yaml", false, true, null, false);
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+    @Test
+    public void testDiamondTopology() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/diamond-topology.yaml", false, true, null, false);
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+
+    @Test
+    public void testHbase() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/simple_hbase.yaml", false, true, null, false);
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBadHbase() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/bad_hbase.yaml", false, true, null, false);
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+    @Test
+    public void testIncludes() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null, false);
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        assertTrue(topologyDef.getName().equals("include-topology"));
+        assertTrue(topologyDef.getBolts().size() > 0);
+        assertTrue(topologyDef.getSpouts().size() > 0);
+        topology.validate();
+    }
+
+    @Test
+    public void testTopologySource() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology.yaml", false, true, null, false);
+        assertTrue(topologyDef.validate());
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+    @Test
+    public void testTopologySourceWithReflection() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false);
+        assertTrue(topologyDef.validate());
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+    @Test
+    public void testTopologySourceWithConfigParam() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", false, true, null, false);
+        assertTrue(topologyDef.validate());
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+    @Test
+    public void testTopologySourceWithMethodName() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-method-override.yaml", false, true, null, false);
+        assertTrue(topologyDef.validate());
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+
+    @Test
+    public void testTridentTopologySource() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, true, null, false);
+        assertTrue(topologyDef.validate());
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidTopologySource() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, true, null, false);
+        assertFalse("Topology config is invalid.", topologyDef.validate());
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+    }
+
+
+    @Test
+    public void testTopologySourceWithGetMethodName() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false);
+        assertTrue(topologyDef.validate());
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+    }
+
+    @Test
+    public void testTopologySourceWithConfigMethods() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/config-methods-test.yaml", false, true, null, false);
+        assertTrue(topologyDef.validate());
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+
+        // make sure the property was actually set
+        TestBolt bolt = (TestBolt)context.getBolt("bolt-1");
+        assertTrue(bolt.getFoo().equals("foo"));
+        assertTrue(bolt.getBar().equals("bar"));
+        assertTrue(bolt.getFooBar().equals("foobar"));
+        assertArrayEquals(new TestBolt.TestClass[] {new TestBolt.TestClass("foo"), new TestBolt.TestClass("bar"), new TestBolt.TestClass("baz")}, bolt.getClasses());
+    }
+
+    @Test
+    public void testVariableSubstitution() throws Exception {
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/substitution-test.yaml", false, true, "src/test/resources/configs/test.properties", true);
+        assertTrue(topologyDef.validate());
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        assertNotNull(topology);
+        topology.validate();
+
+        // test basic substitution
+        assertEquals("Property not replaced.",
+                "substitution-topology",
+                context.getTopologyDef().getName());
+
+        // test environment variable substitution
+        // $PATH should be defined on most systems
+        String envPath = System.getenv().get("PATH");
+        assertEquals("ENV variable not replaced.",
+                envPath,
+                context.getTopologyDef().getConfig().get("test.env.value"));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java
new file mode 100644
index 0000000..dcded17
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.multilang;
+
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Sanity checks to make sure we can at least invoke the shells used.
+ */
+public class MultilangEnvirontmentTest {
+    private static final Logger LOG = LoggerFactory.getLogger(MultilangEnvirontmentTest.class);
+
+    @Test
+    public void testInvokePython() throws Exception {
+        String[] command = new String[]{"python", "--version"};
+        int exitVal = invokeCommand(command);
+        assertEquals("Exit value for python is 0.", 0, exitVal);
+    }
+
+    @Test
+    public void testInvokeNode() throws Exception {
+        String[] command = new String[]{"node", "--version"};
+        int exitVal = invokeCommand(command);
+        assertEquals("Exit value for node is 0.", 0, exitVal);
+    }
+
+    private static class StreamRedirect implements Runnable {
+        private InputStream in;
+        private OutputStream out;
+
+        public StreamRedirect(InputStream in, OutputStream out) {
+            this.in = in;
+            this.out = out;
+        }
+
+        @Override
+        public void run() {
+            try {
+                int i = -1;
+                while ((i = this.in.read()) != -1) {
+                    out.write(i);
+                }
+                this.in.close();
+                this.out.close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private int invokeCommand(String[] args) throws Exception {
+        LOG.debug("Invoking command: {}", args);
+
+        ProcessBuilder pb = new ProcessBuilder(args);
+        pb.redirectErrorStream(true);
+        final Process proc = pb.start();
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        Thread t = new Thread(new StreamRedirect(proc.getInputStream(), out));
+        t.start();
+        int exitVal = proc.waitFor();
+        LOG.debug("Command result: {}", out.toString());
+        return exitVal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java
new file mode 100644
index 0000000..ff65a8a
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.test;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.flux.wrappers.bolts.LogInfoBolt;
+import org.apache.storm.flux.wrappers.spouts.FluxShellSpout;
+
+import java.util.Map;
+
+/**
+ * Test topology source that does not implement TopologySource, but has the same
+ * `getTopology()` method.
+ */
+public class SimpleTopology{
+
+
+    public SimpleTopology(){}
+
+    public SimpleTopology(String foo, String bar){}
+
+    public StormTopology getTopologyWithDifferentMethodName(Map<String, Object> config){
+        return getTopology(config);
+    }
+
+
+    public StormTopology getTopology(Map<String, Object> config) {
+        TopologyBuilder builder = new TopologyBuilder();
+
+        // spouts
+        FluxShellSpout spout = new FluxShellSpout(
+                new String[]{"node", "randomsentence.js"},
+                new String[]{"word"});
+        builder.setSpout("sentence-spout", spout, 1);
+
+        // bolts
+        builder.setBolt("log-bolt", new LogInfoBolt(), 1)
+                .shuffleGrouping("sentence-spout");
+
+        return builder.createTopology();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java
new file mode 100644
index 0000000..2fadacf
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.test;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.flux.api.TopologySource;
+import org.apache.storm.flux.wrappers.bolts.LogInfoBolt;
+import org.apache.storm.flux.wrappers.spouts.FluxShellSpout;
+
+import java.util.Map;
+
+public class SimpleTopologySource implements TopologySource {
+
+
+    public SimpleTopologySource(){}
+
+    public SimpleTopologySource(String foo, String bar){}
+
+
+    @Override
+    public StormTopology getTopology(Map<String, Object> config) {
+        TopologyBuilder builder = new TopologyBuilder();
+
+        // spouts
+        FluxShellSpout spout = new FluxShellSpout(
+                new String[]{"node", "randomsentence.js"},
+                new String[]{"word"});
+        builder.setSpout("sentence-spout", spout, 1);
+
+        // bolts
+        builder.setBolt("log-bolt", new LogInfoBolt(), 1)
+                .shuffleGrouping("sentence-spout");
+
+        return builder.createTopology();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java
new file mode 100644
index 0000000..78195b5
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.test;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.flux.wrappers.bolts.LogInfoBolt;
+import org.apache.storm.flux.wrappers.spouts.FluxShellSpout;
+
+/**
+ * Test topology source that does not implement TopologySource, but has the same
+ * `getTopology()` method.
+ */
+public class SimpleTopologyWithConfigParam {
+
+
+    public SimpleTopologyWithConfigParam(){}
+
+    public SimpleTopologyWithConfigParam(String foo, String bar){}
+
+
+    public StormTopology getTopology(Config config) {
+        TopologyBuilder builder = new TopologyBuilder();
+
+        // spouts
+        FluxShellSpout spout = new FluxShellSpout(
+                new String[]{"node", "randomsentence.js"},
+                new String[]{"word"});
+        builder.setSpout("sentence-spout", spout, 1);
+
+        // bolts
+        builder.setBolt("log-bolt", new LogInfoBolt(), 1)
+                .shuffleGrouping("sentence-spout");
+
+        return builder.createTopology();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java
new file mode 100644
index 0000000..28d11b6
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.test;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+
+public class TestBolt extends BaseBasicBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(TestBolt.class);
+
+    private String foo;
+    private String bar;
+    private String fooBar;
+    private String none;
+    private TestClass[] classes;
+
+    public static class TestClass implements Serializable {
+        private String field;
+
+        public TestClass(String field) {
+            this.field = field;
+        }
+
+        public String getField() {
+            return field;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof TestClass)) return false;
+
+            TestClass testClass = (TestClass) o;
+
+            return getField() != null ? getField().equals(testClass.getField()) : testClass.getField() == null;
+        }
+
+        @Override
+        public int hashCode() {
+            return getField() != null ? getField().hashCode() : 0;
+        }
+    }
+
+
+    public static enum TestEnum {
+        FOO,
+        BAR
+    }
+
+    public TestBolt(TestEnum te){
+
+    }
+
+    public TestBolt(TestEnum te, float f){
+
+    }
+
+    public TestBolt(TestEnum te, float f, boolean b){
+
+    }
+
+    public TestBolt(TestEnum te, float f, boolean b, TestClass... str) {
+
+    }
+
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+        LOG.info("{}", tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+    }
+
+    // config methods
+    public void withFoo(String foo){
+        this.foo = foo;
+    }
+    public void withNone(){
+        this.none = "hit";
+    }
+    public void withBar(String bar){
+        this.bar = bar;
+    }
+
+    public void withFooBar(String foo, String bar){
+        this.fooBar = foo + bar;
+    }
+
+    public void withClasses(TestClass...classes) {
+        this.classes = classes;
+    }
+
+    public String getFoo(){
+        return this.foo;
+    }
+    public String getBar(){
+        return this.bar;
+    }
+
+    public String getFooBar(){
+        return this.fooBar;
+    }
+
+    public TestClass[] getClasses() {
+        return classes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
new file mode 100644
index 0000000..36b272b
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.test;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.kafka.StringScheme;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ * Basic Trident example that will return a `StormTopology` from a `getTopology()` method.
+ */
+public class TridentTopologySource {
+
+    private FixedBatchSpout spout;
+
+    public StormTopology getTopology(Config config) {
+
+        this.spout = new FixedBatchSpout(new Fields("sentence"), 20,
+                new Values("one two"),
+                new Values("two three"),
+                new Values("three four"),
+                new Values("four five"),
+                new Values("five six")
+        );
+
+
+        TridentTopology trident = new TridentTopology();
+
+        trident.newStream("wordcount", spout).name("sentence").parallelismHint(1).shuffle()
+                .each(new Fields("sentence"), new Split(), new Fields("word"))
+                .parallelismHint(1)
+                .groupBy(new Fields("word"))
+                .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
+                .parallelismHint(1);
+        return trident.build();
+    }
+
+    public static class Split extends BaseFunction {
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word : sentence.split(" ")) {
+                collector.emit(new Values(word));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/resources/configs/bad_hbase.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/bad_hbase.yaml b/flux/flux-core/src/test/resources/configs/bad_hbase.yaml
new file mode 100644
index 0000000..a29e314
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/bad_hbase.yaml
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Test ability to wire together shell spouts/bolts
+---
+
+# topology definition
+# name to be used when submitting
+name: "hbase-wordcount"
+
+# Components
+# Components are analagous to Spring beans. They are meant to be used as constructor,
+# property(setter), and builder arguments.
+#
+# for the time being, components must be declared in the order they are referenced
+
+components:
+  - id: "columnFields"
+    className: "org.apache.storm.tuple.Fields"
+    constructorArgs:
+      - ["word"]
+
+  - id: "counterFields"
+    className: "org.apache.storm.tuple.Fields"
+    constructorArgs:
+      # !!! the following won't work, and should thow an IllegalArgumentException...
+      - "count"
+
+  - id: "mapper"
+    className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper"
+    configMethods:
+      - name: "withRowKeyField"
+        args: ["word"]
+      - name: "withColumnFields"
+        args: [ref: "columnFields"]
+      - name: "withCounterFields"
+        args: [ref: "counterFields"]
+      - name: "withColumnFamily"
+        args: ["cf"]
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+#
+config:
+  topology.workers: 1
+  hbase.conf:
+    hbase.rootdir: "hdfs://hadoop:54310/hbase"
+    hbase.zookeeper.quorum: "hadoop"
+
+# spout definitions
+spouts:
+  - id: "word-spout"
+    className: "org.apache.storm.testing.TestWordSpout"
+    parallelism: 1
+
+# bolt definitions
+
+bolts:
+  - id: "count-bolt"
+    className: "org.apache.storm.testing.TestWordCounter"
+
+  - id: "hbase-bolt"
+    className: "org.apache.storm.hbase.bolt.HBaseBolt"
+    constructorArgs:
+      - "WordCount" # HBase table name
+      - ref: "mapper"
+    configMethods:
+      - name: "withConfigKey"
+        args: ["hbase.conf"]
+    parallelism: 1
+
+
+streams:
+  - name: "" # name isn't used (placeholder for logging, UI, etc.)
+    from: "word-spout"
+    to: "count-bolt"
+    grouping:
+      type: SHUFFLE
+
+  - name: "" # name isn't used (placeholder for logging, UI, etc.)
+    from: "count-bolt"
+    to: "hbase-bolt"
+    grouping:
+      type: FIELDS
+      args: ["word"]

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/resources/configs/bad_shell_test.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/bad_shell_test.yaml b/flux/flux-core/src/test/resources/configs/bad_shell_test.yaml
new file mode 100644
index 0000000..0892ce7
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/bad_shell_test.yaml
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Test ability to wire together shell spouts/bolts
+---
+
+# topology definition
+# name to be used when submitting
+name: "shell-topology"
+
+# Components
+# Components are analagous to Spring beans. They are meant to be used as constructor,
+# property(setter), and builder arguments.
+#components:
+#  - id: "myComponent"
+#    className: "com.foo.bar.MyComponent"
+#    constructorArgs:
+#      - ...
+#    properties:
+#      foo: "bar"
+#      bar: "foo"
+
+# NOTE: We may want to consider some level of spring integration. For example, allowing component references
+# to a spring `ApplicationContext`.
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+#
+config:
+  topology.workers: 1
+  # ...
+
+# spout definitions
+spouts:
+  - id: "sentence-spout"
+    className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
+    # shell spout constructor takes 2 arguments: String[], String[]
+    constructorArgs:
+      # command line
+      - ["node", "randomsentence.js"]
+      # output fields
+      - ["word"]
+    configMethods:
+      - name: "addComponentConfig"
+        args: ["rabbitmq.configfile", "etc/rabbit.yml", "hello"]
+      - name: "addComponentConfig"
+        args:
+        - "publisher.data_paths"
+        - ["actions", "hello"]
+    parallelism: 1
+    # ...
+
+# bolt definitions
+bolts:
+  - id: "splitsentence"
+    className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
+    constructorArgs:
+      # command line
+      - ["python", "splitsentence.py"]
+      # output fields
+      - ["word"]
+    configMethods:
+      - name: "addComponentConfig"
+        args: ["rabbitmq.configfile", "etc/rabbit.yml", "hello"]
+      - name: "addComponentConfig"
+        args:
+        - "publisher.data_paths"
+        - ["actions", "hello"]
+    parallelism: 1
+    # ...
+
+  - id: "log"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+    # ...
+
+  - id: "count"
+    className: "org.apache.storm.testing.TestWordCounter"
+    parallelism: 1
+    # ...
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+# custom stream groupings are also supported
+
+streams:
+  - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.)
+    from: "sentence-spout"
+    to: "splitsentence"
+    grouping:
+      type: SHUFFLE
+
+  - name: "split --> count"
+    from: "splitsentence"
+    to: "count"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - name: "count --> log"
+    from: "count"
+    to: "log"
+    grouping:
+      type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/resources/configs/config-methods-test.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/config-methods-test.yaml b/flux/flux-core/src/test/resources/configs/config-methods-test.yaml
new file mode 100644
index 0000000..7c4ffb3
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/config-methods-test.yaml
@@ -0,0 +1,92 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+---
+name: "yaml-topology"
+
+#
+config:
+  topology.workers: 1
+  # ...
+
+components:
+  - id: "foo"
+    className: "org.apache.storm.flux.test.TestBolt$TestClass"
+    constructorArgs:
+      - "foo"
+  - id: "bar"
+    className: "org.apache.storm.flux.test.TestBolt$TestClass"
+    constructorArgs:
+      - "bar"
+  - id: "baz"
+    className: "org.apache.storm.flux.test.TestBolt$TestClass"
+    constructorArgs:
+      - "baz"
+
+# spout definitions
+spouts:
+  - id: "spout-1"
+    className: "org.apache.storm.testing.TestWordSpout"
+    parallelism: 1
+    # ...
+
+# bolt definitions
+bolts:
+  - id: "bolt-1"
+    className: "org.apache.storm.flux.test.TestBolt"
+    parallelism: 1
+    constructorArgs:
+      - FOO # enum class
+      - 1.0
+      - true
+      - reflist: ["foo", "bar"]
+    configMethods:
+      - name: "withFoo"
+        args:
+          - "foo"
+      - name: "withNone"
+      - name: "withBar"
+        args:
+          - "bar"
+      - name: "withFooBar"
+        args:
+          - "foo"
+          - "bar"
+      - name: "withClasses"
+        args:
+          - reflist:
+            - "foo"
+            - "bar"
+            - "baz"
+
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+streams:
+  - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
+#    id: "connection-1"
+    from: "spout-1"
+    to: "bolt-1"
+    grouping:
+      type: SHUFFLE
+
+
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/resources/configs/diamond-topology.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/diamond-topology.yaml b/flux/flux-core/src/test/resources/configs/diamond-topology.yaml
new file mode 100644
index 0000000..957c258
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/diamond-topology.yaml
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+# topology definition
+# name to be used when submitting
+name: "diamond-topology"
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+#
+config:
+  topology.workers: 1
+
+# spout definitions
+spouts:
+  - id: "spout-1"
+    className: "org.apache.storm.testing.TestWordSpout"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+  - id: "A"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+  - id: "B"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+  - id: "C"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+  - id: "D"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+streams:
+  - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
+#    id: "connection-1"
+    from: "spout-1"
+    to: "A"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - from: "A"
+    to: "B"
+    grouping:
+      type: SHUFFLE
+
+  - from: "A"
+    to: "C"
+    grouping:
+      type: SHUFFLE
+
+  - from: "C"
+    to: "D"
+    grouping:
+      type: SHUFFLE
+
+  - from: "B"
+    to: "D"
+    grouping:
+      type: SHUFFLE
+
+
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml b/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml
new file mode 100644
index 0000000..fceeeed
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+---
+
+# configuration that uses an existing topology that does not implement TopologySource
+name: "existing-topology"
+topologySource:
+  className: "org.apache.storm.flux.test.SimpleTopology"
+  methodName: "getTopologyWithDifferentMethodName"
+  constructorArgs:
+    - "foo"
+    - "bar"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml b/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml
new file mode 100644
index 0000000..440fe4d
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+---
+
+# configuration that uses an existing topology that does not implement TopologySource
+name: "existing-topology"
+topologySource:
+  className: "org.apache.storm.flux.test.SimpleTopologyWithConfigParam"
+  constructorArgs:
+    - "foo"
+    - "bar"
\ No newline at end of file