You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/04 04:06:07 UTC

[25/50] [abbrv] storm git commit: merge flux into external/flux/

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
index 0000000,0000000..57237b6
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
@@@ -1,0 -1,0 +1,591 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux;
++
++import backtype.storm.Config;
++import backtype.storm.generated.StormTopology;
++import backtype.storm.grouping.CustomStreamGrouping;
++import backtype.storm.topology.*;
++import backtype.storm.tuple.Fields;
++import backtype.storm.utils.Utils;
++import org.apache.storm.flux.api.TopologySource;
++import org.apache.storm.flux.model.*;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.lang.reflect.*;
++import java.util.ArrayList;
++import java.util.Collection;
++import java.util.List;
++import java.util.Map;
++
++public class FluxBuilder {
++    private static Logger LOG = LoggerFactory.getLogger(FluxBuilder.class);
++
++    /**
++     * Given a topology definition, return a populated `backtype.storm.Config` instance.
++     *
++     * @param topologyDef
++     * @return
++     */
++    public static Config buildConfig(TopologyDef topologyDef) {
++        // merge contents of `config` into topology config
++        Config conf = new Config();
++        conf.putAll(topologyDef.getConfig());
++        return conf;
++    }
++
++    /**
++     * Given a topology definition, return a Storm topology that can be run either locally or remotely.
++     *
++     * @param context
++     * @return
++     * @throws IllegalAccessException
++     * @throws InstantiationException
++     * @throws ClassNotFoundException
++     * @throws NoSuchMethodException
++     * @throws InvocationTargetException
++     */
++    static StormTopology buildTopology(ExecutionContext context) throws IllegalAccessException,
++            InstantiationException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException {
++
++        StormTopology topology = null;
++        TopologyDef topologyDef = context.getTopologyDef();
++
++        if(!topologyDef.validate()){
++            throw new IllegalArgumentException("Invalid topology config. Spouts, bolts and streams cannot be " +
++                    "defined in the same configuration as a topologySource.");
++        }
++
++        // build components that may be referenced by spouts, bolts, etc.
++        // the map will be a String --> Object where the object is a fully
++        // constructed class instance
++        buildComponents(context);
++
++        if(topologyDef.isDslTopology()) {
++            // This is a DSL (YAML, etc.) topology...
++            LOG.info("Detected DSL topology...");
++
++            TopologyBuilder builder = new TopologyBuilder();
++
++            // create spouts
++            buildSpouts(context, builder);
++
++            // we need to be able to lookup bolts by id, then switch based
++            // on whether they are IBasicBolt or IRichBolt instances
++            buildBolts(context);
++
++            // process stream definitions
++            buildStreamDefinitions(context, builder);
++
++            topology = builder.createTopology();
++        } else {
++            // user class supplied...
++            // this also provides a bridge to Trident...
++            LOG.info("A topology source has been specified...");
++            ObjectDef def = topologyDef.getTopologySource();
++            topology = buildExternalTopology(def, context);
++        }
++        return topology;
++    }
++
++    /**
++     * Given a `java.lang.Object` instance and a method name, attempt to find a method that matches the input
++     * parameter: `java.util.Map` or `backtype.storm.Config`.
++     *
++     * @param topologySource object to inspect for the specified method
++     * @param methodName name of the method to look for
++     * @return
++     * @throws NoSuchMethodException
++     */
++    private static Method findGetTopologyMethod(Object topologySource, String methodName) throws NoSuchMethodException {
++        Class clazz = topologySource.getClass();
++        Method[] methods =  clazz.getMethods();
++        ArrayList<Method> candidates = new ArrayList<Method>();
++        for(Method method : methods){
++            if(!method.getName().equals(methodName)){
++                continue;
++            }
++            if(!method.getReturnType().equals(StormTopology.class)){
++                continue;
++            }
++            Class[] paramTypes = method.getParameterTypes();
++            if(paramTypes.length != 1){
++                continue;
++            }
++            if(paramTypes[0].isAssignableFrom(Map.class) || paramTypes[0].isAssignableFrom(Config.class)){
++                candidates.add(method);
++            }
++        }
++
++        if(candidates.size() == 0){
++            throw new IllegalArgumentException("Unable to find method '" + methodName + "' method in class: " + clazz.getName());
++        } else if (candidates.size() > 1){
++            LOG.warn("Found multiple candidate methods in class '" + clazz.getName() + "'. Using the first one found");
++        }
++
++        return candidates.get(0);
++    }
++
++    /**
++     * @param context
++     * @param builder
++     */
++    private static void buildStreamDefinitions(ExecutionContext context, TopologyBuilder builder)
++            throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException,
++            IllegalAccessException {
++        TopologyDef topologyDef = context.getTopologyDef();
++        // process stream definitions
++        for (StreamDef stream : topologyDef.getStreams()) {
++            Object boltObj = context.getBolt(stream.getTo());
++            BoltDeclarer declarer = null;
++            if (boltObj instanceof IRichBolt) {
++                declarer = builder.setBolt(stream.getTo(),
++                        (IRichBolt) boltObj,
++                        topologyDef.parallelismForBolt(stream.getTo()));
++            } else if (boltObj instanceof IBasicBolt) {
++                declarer = builder.setBolt(
++                        stream.getTo(),
++                        (IBasicBolt) boltObj,
++                        topologyDef.parallelismForBolt(stream.getTo()));
++            } else {
++                throw new IllegalArgumentException("Class does not appear to be a bolt: " +
++                        boltObj.getClass().getName());
++            }
++
++            GroupingDef grouping = stream.getGrouping();
++            // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream
++            String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId());
++
++
++            switch (grouping.getType()) {
++                case SHUFFLE:
++                    declarer.shuffleGrouping(stream.getFrom(), streamId);
++                    break;
++                case FIELDS:
++                    //TODO check for null grouping args
++                    declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(grouping.getArgs()));
++                    break;
++                case ALL:
++                    declarer.allGrouping(stream.getFrom(), streamId);
++                    break;
++                case DIRECT:
++                    declarer.directGrouping(stream.getFrom(), streamId);
++                    break;
++                case GLOBAL:
++                    declarer.globalGrouping(stream.getFrom(), streamId);
++                    break;
++                case LOCAL_OR_SHUFFLE:
++                    declarer.localOrShuffleGrouping(stream.getFrom(), streamId);
++                    break;
++                case NONE:
++                    declarer.noneGrouping(stream.getFrom(), streamId);
++                    break;
++                case CUSTOM:
++                    declarer.customGrouping(stream.getFrom(), streamId,
++                            buildCustomStreamGrouping(stream.getGrouping().getCustomClass(), context));
++                    break;
++                default:
++                    throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
++            }
++        }
++    }
++
++    private static void applyProperties(ObjectDef bean, Object instance, ExecutionContext context) throws
++            IllegalAccessException, InvocationTargetException {
++        List<PropertyDef> props = bean.getProperties();
++        Class clazz = instance.getClass();
++        if (props != null) {
++            for (PropertyDef prop : props) {
++                Object value = prop.isReference() ? context.getComponent(prop.getRef()) : prop.getValue();
++                Method setter = findSetter(clazz, prop.getName(), value);
++                if (setter != null) {
++                    LOG.debug("found setter, attempting to invoke");
++                    // invoke setter
++                    setter.invoke(instance, new Object[]{value});
++                } else {
++                    // look for a public instance variable
++                    LOG.debug("no setter found. Looking for a public instance variable...");
++                    Field field = findPublicField(clazz, prop.getName(), value);
++                    if (field != null) {
++                        field.set(instance, value);
++                    }
++                }
++            }
++        }
++    }
++
++    private static Field findPublicField(Class clazz, String property, Object arg) {
++        Field field = null;
++        try {
++            field = clazz.getField(property);
++        } catch (NoSuchFieldException e) {
++            LOG.warn("Could not find setter or public variable for property: " + property, e);
++        }
++        return field;
++    }
++
++    private static Method findSetter(Class clazz, String property, Object arg) {
++        String setterName = toSetterName(property);
++        Method retval = null;
++        Method[] methods = clazz.getMethods();
++        for (Method method : methods) {
++            if (setterName.equals(method.getName())) {
++                LOG.debug("Found setter method: " + method.getName());
++                retval = method;
++            }
++        }
++        return retval;
++    }
++
++    private static String toSetterName(String name) {
++        return "set" + name.substring(0, 1).toUpperCase() + name.substring(1, name.length());
++    }
++
++    private static List<Object> resolveReferences(List<Object> args, ExecutionContext context) {
++        LOG.debug("Checking arguments for references.");
++        List<Object> cArgs = new ArrayList<Object>();
++        // resolve references
++        for (Object arg : args) {
++            if (arg instanceof BeanReference) {
++                cArgs.add(context.getComponent(((BeanReference) arg).getId()));
++            } else {
++                cArgs.add(arg);
++            }
++        }
++        return cArgs;
++    }
++
++    private static Object buildObject(ObjectDef def, ExecutionContext context) throws ClassNotFoundException,
++            IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
++        Class clazz = Class.forName(def.getClassName());
++        Object obj = null;
++        if (def.hasConstructorArgs()) {
++            LOG.debug("Found constructor arguments in definition: " + def.getConstructorArgs().getClass().getName());
++            List<Object> cArgs = def.getConstructorArgs();
++            if(def.hasReferences()){
++                cArgs = resolveReferences(cArgs, context);
++            }
++            Constructor con = findCompatibleConstructor(cArgs, clazz);
++            if (con != null) {
++                LOG.debug("Found something seemingly compatible, attempting invocation...");
++                obj = con.newInstance(getArgsWithListCoercian(cArgs, con.getParameterTypes()));
++            } else {
++                String msg = String.format("Couldn't find a suitable constructor for class '%s' with arguments '%s'.",
++                        clazz.getName(),
++                        cArgs);
++                throw new IllegalArgumentException(msg);
++            }
++        } else {
++            obj = clazz.newInstance();
++        }
++        applyProperties(def, obj, context);
++        invokeConfigMethods(def, obj, context);
++        return obj;
++    }
++
++    private static StormTopology buildExternalTopology(ObjectDef def, ExecutionContext context)
++            throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException,
++            InvocationTargetException {
++
++        Object topologySource = buildObject(def, context);
++
++        String methodName = context.getTopologyDef().getTopologySource().getMethodName();
++        Method getTopology = findGetTopologyMethod(topologySource, methodName);
++        if(getTopology.getParameterTypes()[0].equals(Config.class)){
++            Config config = new Config();
++            config.putAll(context.getTopologyDef().getConfig());
++            return (StormTopology) getTopology.invoke(topologySource, config);
++        } else {
++            return (StormTopology) getTopology.invoke(topologySource, context.getTopologyDef().getConfig());
++        }
++    }
++
++    private static CustomStreamGrouping buildCustomStreamGrouping(ObjectDef def, ExecutionContext context)
++            throws ClassNotFoundException,
++            IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
++        Object grouping = buildObject(def, context);
++        return (CustomStreamGrouping)grouping;
++    }
++
++    /**
++     * Given a topology definition, resolve and instantiate all components found and return a map
++     * keyed by the component id.
++     */
++    private static void buildComponents(ExecutionContext context) throws ClassNotFoundException, NoSuchMethodException,
++            IllegalAccessException, InvocationTargetException, InstantiationException {
++        Collection<BeanDef> cDefs = context.getTopologyDef().getComponents();
++        if (cDefs != null) {
++            for (BeanDef bean : cDefs) {
++                Object obj = buildObject(bean, context);
++                context.addComponent(bean.getId(), obj);
++            }
++        }
++    }
++
++
++    private static void buildSpouts(ExecutionContext context, TopologyBuilder builder) throws ClassNotFoundException,
++            NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
++        for (SpoutDef sd : context.getTopologyDef().getSpouts()) {
++            IRichSpout spout = buildSpout(sd, context);
++            builder.setSpout(sd.getId(), spout, sd.getParallelism());
++            context.addSpout(sd.getId(), spout);
++        }
++    }
++
++    /**
++     * Given a spout definition, return a Storm spout implementation by attempting to find a matching constructor
++     * in the given spout class. Perform list to array conversion as necessary.
++     */
++    private static IRichSpout buildSpout(SpoutDef def, ExecutionContext context) throws ClassNotFoundException,
++            IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
++        return (IRichSpout)buildObject(def, context);
++    }
++
++    /**
++     * Given a list of bolt definitions, build a map of Storm bolts with the bolt definition id as the key.
++     * Attempt to coerce the given constructor arguments to a matching bolt constructor as much as possible.
++     */
++    private static void buildBolts(ExecutionContext context) throws ClassNotFoundException, IllegalAccessException,
++            InstantiationException, NoSuchMethodException, InvocationTargetException {
++        for (BoltDef def : context.getTopologyDef().getBolts()) {
++            Class clazz = Class.forName(def.getClassName());
++            Object bolt = buildObject(def, context);
++            context.addBolt(def.getId(), bolt);
++        }
++    }
++
++    /**
++     * Given a list of constructor arguments, and a target class, attempt to find a suitable constructor.
++     *
++     */
++    private static Constructor findCompatibleConstructor(List<Object> args, Class target) throws NoSuchMethodException {
++        Constructor retval = null;
++        int eligibleCount = 0;
++
++        LOG.debug("Target class: {}", target.getName());
++        Constructor[] cons = target.getDeclaredConstructors();
++
++        for (Constructor con : cons) {
++            Class[] paramClasses = con.getParameterTypes();
++            if (paramClasses.length == args.size()) {
++                LOG.debug("found constructor with same number of args..");
++                boolean invokable = canInvokeWithArgs(args, con.getParameterTypes());
++                if (invokable) {
++                    retval = con;
++                    eligibleCount++;
++                }
++                LOG.debug("** invokable --> {}", invokable);
++            } else {
++                LOG.debug("Skipping constructor with wrong number of arguments.");
++            }
++        }
++        if (eligibleCount > 1) {
++            LOG.warn("Found multiple invokable constructors for class {}, given arguments {}. Using the last one found.",
++                    target, args);
++        }
++        return retval;
++    }
++
++
++    public static void invokeConfigMethods(ObjectDef bean, Object instance, ExecutionContext context)
++            throws InvocationTargetException, IllegalAccessException {
++
++        List<ConfigMethodDef> methodDefs = bean.getConfigMethods();
++        if(methodDefs == null || methodDefs.size() == 0){
++            return;
++        }
++        Class clazz = instance.getClass();
++        for(ConfigMethodDef methodDef : methodDefs){
++            List<Object> args = methodDef.getArgs();
++            if(methodDef.hasReferences()){
++                args = resolveReferences(args, context);
++            }
++            String methodName = methodDef.getName();
++            Method method = findCompatibleMethod(args, clazz, methodName);
++            if(method != null) {
++                Object[] methodArgs = getArgsWithListCoercian(args, method.getParameterTypes());
++                method.invoke(instance, methodArgs);
++            } else {
++                String msg = String.format("Unable to find configuration method '%s' in class '%s' with arguments %s.",
++                        new Object[]{methodName, clazz.getName(), args});
++                throw new IllegalArgumentException(msg);
++            }
++        }
++    }
++
++    private static Method findCompatibleMethod(List<Object> args, Class target, String methodName){
++        Method retval = null;
++        int eligibleCount = 0;
++
++        LOG.debug("Target class: {}", target.getName());
++        Method[] methods = target.getMethods();
++
++        for (Method method : methods) {
++            Class[] paramClasses = method.getParameterTypes();
++            if (paramClasses.length == args.size() && method.getName().equals(methodName)) {
++                LOG.debug("found constructor with same number of args..");
++                boolean invokable = canInvokeWithArgs(args, method.getParameterTypes());
++                if (invokable) {
++                    retval = method;
++                    eligibleCount++;
++                }
++                LOG.debug("** invokable --> {}", invokable);
++            } else {
++                LOG.debug("Skipping method with wrong number of arguments.");
++            }
++        }
++        if (eligibleCount > 1) {
++            LOG.warn("Found multiple invokable methods for class {}, method {}, given arguments {}. " +
++                            "Using the last one found.",
++                            new Object[]{target, methodName, args});
++        }
++        return retval;
++    }
++
++    /**
++     * Given a java.util.List of contructor/method arguments, and a list of parameter types, attempt to convert the
++     * list to an java.lang.Object array that can be used to invoke the constructor. If an argument needs
++     * to be coerced from a List to an Array, do so.
++     */
++    private static Object[] getArgsWithListCoercian(List<Object> args, Class[] parameterTypes) {
++//        Class[] parameterTypes = constructor.getParameterTypes();
++        if (parameterTypes.length != args.size()) {
++            throw new IllegalArgumentException("Contructor parameter count does not egual argument size.");
++        }
++        Object[] constructorParams = new Object[args.size()];
++
++        // loop through the arguments, if we hit a list that has to be convered to an array,
++        // perform the conversion
++        for (int i = 0; i < args.size(); i++) {
++            Object obj = args.get(i);
++            Class paramType = parameterTypes[i];
++            Class objectType = obj.getClass();
++            LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.",
++                    paramType, objectType);
++            if (paramType.equals(objectType)) {
++                LOG.debug("They are the same class.");
++                constructorParams[i] = args.get(i);
++                continue;
++            }
++            if (paramType.isAssignableFrom(objectType)) {
++                LOG.debug("Assignment is possible.");
++                constructorParams[i] = args.get(i);
++                continue;
++            }
++            if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){
++                LOG.debug("Its a primitive number.");
++                Number num = (Number)args.get(i);
++                if(paramType == Float.TYPE){
++                    constructorParams[i] = num.floatValue();
++                } else if (paramType == Double.TYPE) {
++                    constructorParams[i] = num.doubleValue();
++                } else if (paramType == Long.TYPE) {
++                    constructorParams[i] = num.longValue();
++                } else if (paramType == Integer.TYPE) {
++                    constructorParams[i] = num.intValue();
++                } else if (paramType == Short.TYPE) {
++                    constructorParams[i] = num.shortValue();
++                } else if (paramType == Byte.TYPE) {
++                    constructorParams[i] = num.byteValue();
++                } else {
++                    constructorParams[i] = args.get(i);
++                }
++                continue;
++            }
++
++            // enum conversion
++            if(paramType.isEnum() && objectType.equals(String.class)){
++                LOG.debug("Yes, will convert a String to enum");
++                constructorParams[i] = Enum.valueOf(paramType, (String)args.get(i));
++                continue;
++            }
++
++            // List to array conversion
++            if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
++                // TODO more collection content type checking
++                LOG.debug("Conversion appears possible...");
++                List list = (List) obj;
++                LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), list.get(0).getClass());
++
++                // create an array of the right type
++                Object newArrayObj = Array.newInstance(paramType.getComponentType(), list.size());
++                for (int j = 0; j < list.size(); j++) {
++                    Array.set(newArrayObj, j, list.get(j));
++
++                }
++                constructorParams[i] = newArrayObj;
++                LOG.debug("After conversion: {}", constructorParams[i]);
++            }
++        }
++        return constructorParams;
++    }
++
++
++    /**
++     * Determine if the given constructor/method parameter types are compatible given arguments List. Consider if
++     * list coercian can make it possible.
++     *
++     * @param args
++     * @param parameterTypes
++     * @return
++     */
++    private static boolean canInvokeWithArgs(List<Object> args, Class[] parameterTypes) {
++        if (parameterTypes.length != args.size()) {
++            LOG.warn("parameter types were the wrong size");
++            return false;
++        }
++
++        for (int i = 0; i < args.size(); i++) {
++            Object obj = args.get(i);
++            Class paramType = parameterTypes[i];
++            Class objectType = obj.getClass();
++            LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.",
++                    paramType, objectType);
++            if (paramType.equals(objectType)) {
++                LOG.debug("Yes, they are the same class.");
++                return true;
++            }
++            if (paramType.isAssignableFrom(objectType)) {
++                LOG.debug("Yes, assignment is possible.");
++                return true;
++            }
++            if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){
++                return true;
++            }
++            if(paramType.isEnum() && objectType.equals(String.class)){
++                LOG.debug("Yes, will convert a String to enum");
++                return true;
++            }
++            if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
++                // TODO more collection content type checking
++                LOG.debug("Assignment is possible if we convert a List to an array.");
++                LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), ((List) obj).get(0).getClass());
++
++                return true;
++            }
++            return false;
++        }
++        return false;
++    }
++
++    public static boolean isPrimitiveNumber(Class clazz){
++        return clazz.isPrimitive() && !clazz.equals(boolean.class);
++    }
++}
++

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
index 0000000,0000000..fbccfb7
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
@@@ -1,0 -1,0 +1,39 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.api;
++
++
++import backtype.storm.generated.StormTopology;
++
++import java.util.Map;
++
++/**
++ * Marker interface for objects that can produce `StormTopology` objects.
++ *
++ * If a `topology-source` class implements the `getTopology()` method, Flux will
++ * call that method. Otherwise, it will introspect the given class and look for a
++ * similar method that produces a `StormTopology` instance.
++ *
++ * Note that it is not strictly necessary for a class to implement this interface.
++ * If a class defines a method with a similar signature, Flux should be able to find
++ * and invoke it.
++ *
++ */
++public interface TopologySource {
++    public StormTopology getTopology(Map<String, Object> config);
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
index 0000000,0000000..72ca5ae
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
@@@ -1,0 -1,0 +1,39 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++import java.util.ArrayList;
++import java.util.LinkedHashMap;
++import java.util.List;
++import java.util.Map;
++
++/**
++ * A representation of a Java object that is uniquely identifyable, and given a className, constructor arguments,
++ * and properties, can be instantiated.
++ */
++public class BeanDef extends ObjectDef {
++    private String id;
++
++    public String getId() {
++        return id;
++    }
++
++    public void setId(String id) {
++        this.id = id;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
index 0000000,0000000..bd236f1
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
@@@ -1,0 -1,0 +1,39 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++/**
++ * A bean reference is simply a string pointer to another id.
++ */
++public class BeanReference {
++    public String id;
++
++    public BeanReference(){}
++
++    public BeanReference(String id){
++        this.id = id;
++    }
++
++    public String getId() {
++        return id;
++    }
++
++    public void setId(String id) {
++        this.id = id;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
index 0000000,0000000..362abf1
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
@@@ -1,0 -1,0 +1,24 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++/**
++ * Bean representation of a Storm bolt.
++ */
++public class BoltDef extends VertexDef {
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
index 0000000,0000000..6f7e4d4
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
@@@ -1,0 -1,0 +1,62 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++import java.util.ArrayList;
++import java.util.LinkedHashMap;
++import java.util.List;
++import java.util.Map;
++
++public class ConfigMethodDef {
++    private String name;
++    private List<Object> args;
++    private boolean hasReferences = false;
++
++    public String getName() {
++        return name;
++    }
++
++    public void setName(String name) {
++        this.name = name;
++    }
++
++    public List<Object> getArgs() {
++        return args;
++    }
++
++    public void setArgs(List<Object> args) {
++
++        List<Object> newVal = new ArrayList<Object>();
++        for(Object obj : args){
++            if(obj instanceof LinkedHashMap){
++                Map map = (Map)obj;
++                if(map.containsKey("ref") && map.size() == 1){
++                    newVal.add(new BeanReference((String)map.get("ref")));
++                    this.hasReferences = true;
++                }
++            } else {
++                newVal.add(obj);
++            }
++        }
++        this.args = newVal;
++    }
++
++    public boolean hasReferences(){
++        return this.hasReferences;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
index 0000000,0000000..e94b887
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
@@@ -1,0 -1,0 +1,77 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++import backtype.storm.Config;
++import backtype.storm.task.IBolt;
++import backtype.storm.topology.IRichSpout;
++
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++
++/**
++ * Container for all the objects required to instantiate a topology.
++ */
++public class ExecutionContext {
++    // parsed Topology definition
++    TopologyDef topologyDef;
++
++    // Storm config
++    private Config config;
++
++    // components
++    private List<Object> compontents;
++    // indexed by id
++    private Map<String, Object> componentMap = new HashMap<String, Object>();
++
++    private Map<String, IRichSpout> spoutMap = new HashMap<String, IRichSpout>();
++
++    private List<IBolt> bolts;
++    private Map<String, Object> boltMap = new HashMap<String, Object>();
++
++    public ExecutionContext(TopologyDef topologyDef, Config config){
++        this.topologyDef = topologyDef;
++        this.config = config;
++    }
++
++    public TopologyDef getTopologyDef(){
++        return this.topologyDef;
++    }
++
++    public void addSpout(String id, IRichSpout spout){
++        this.spoutMap.put(id, spout);
++    }
++
++    public void addBolt(String id, Object bolt){
++        this.boltMap.put(id, bolt);
++    }
++
++    public Object getBolt(String id){
++        return this.boltMap.get(id);
++    }
++
++    public void addComponent(String id, Object value){
++        this.componentMap.put(id, value);
++    }
++
++    public Object getComponent(String id){
++        return this.componentMap.get(id);
++    }
++
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
index 0000000,0000000..e4fac8e
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
@@@ -1,0 -1,0 +1,77 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++import java.util.List;
++
++/**
++ * Bean representation of a Storm stream grouping.
++ */
++public class GroupingDef {
++
++    /**
++     * Types of stream groupings Storm allows
++     */
++    public static enum Type {
++        ALL,
++        CUSTOM,
++        DIRECT,
++        SHUFFLE,
++        LOCAL_OR_SHUFFLE,
++        FIELDS,
++        GLOBAL,
++        NONE
++    }
++
++    private Type type;
++    private String streamId;
++    private List<String> args;
++    private ObjectDef customClass;
++
++    public List<String> getArgs() {
++        return args;
++    }
++
++    public void setArgs(List<String> args) {
++        this.args = args;
++    }
++
++    public Type getType() {
++        return type;
++    }
++
++    public void setType(Type type) {
++        this.type = type;
++    }
++
++    public String getStreamId() {
++        return streamId;
++    }
++
++    public void setStreamId(String streamId) {
++        this.streamId = streamId;
++    }
++
++    public ObjectDef getCustomClass() {
++        return customClass;
++    }
++
++    public void setCustomClass(ObjectDef customClass) {
++        this.customClass = customClass;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
index 0000000,0000000..23fd9d2
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
@@@ -1,0 -1,0 +1,54 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++/**
++ * Represents an include. Includes can be either a file or a classpath resource.
++ *
++ * If an include is marked as `override=true` then existing properties will be replaced.
++ *
++ */
++public class IncludeDef {
++    private boolean resource = false;
++    boolean override = false;
++    private String file;
++
++    public boolean isResource() {
++        return resource;
++    }
++
++    public void setResource(boolean resource) {
++        this.resource = resource;
++    }
++
++    public String getFile() {
++        return file;
++    }
++
++    public void setFile(String file) {
++        this.file = file;
++    }
++
++    public boolean isOverride() {
++        return override;
++    }
++
++    public void setOverride(boolean override) {
++        this.override = override;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
index 0000000,0000000..7386900
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
@@@ -1,0 -1,0 +1,90 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++import backtype.storm.Config;
++
++import java.util.ArrayList;
++import java.util.LinkedHashMap;
++import java.util.List;
++import java.util.Map;
++
++/**
++ * A representation of a Java object that given a className, constructor arguments,
++ * and properties, can be instantiated.
++ */
++public class ObjectDef {
++    private String className;
++    private List<Object> constructorArgs;
++    private boolean hasReferences;
++    private List<PropertyDef> properties;
++    private List<ConfigMethodDef> configMethods;
++
++    public String getClassName() {
++        return className;
++    }
++
++    public void setClassName(String className) {
++        this.className = className;
++    }
++
++    public List<Object> getConstructorArgs() {
++        return constructorArgs;
++    }
++
++    public void setConstructorArgs(List<Object> constructorArgs) {
++
++        List<Object> newVal = new ArrayList<Object>();
++        for(Object obj : constructorArgs){
++            if(obj instanceof LinkedHashMap){
++                Map map = (Map)obj;
++                if(map.containsKey("ref") && map.size() == 1){
++                    newVal.add(new BeanReference((String)map.get("ref")));
++                    this.hasReferences = true;
++                }
++            } else {
++                newVal.add(obj);
++            }
++        }
++        this.constructorArgs = newVal;
++    }
++
++    public boolean hasConstructorArgs(){
++        return this.constructorArgs != null && this.constructorArgs.size() > 0;
++    }
++
++    public boolean hasReferences(){
++        return this.hasReferences;
++    }
++
++    public List<PropertyDef> getProperties() {
++        return properties;
++    }
++
++    public void setProperties(List<PropertyDef> properties) {
++        this.properties = properties;
++    }
++
++    public List<ConfigMethodDef> getConfigMethods() {
++        return configMethods;
++    }
++
++    public void setConfigMethods(List<ConfigMethodDef> configMethods) {
++        this.configMethods = configMethods;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
index 0000000,0000000..f3d7704
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
@@@ -1,0 -1,0 +1,58 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++public class PropertyDef {
++    private String name;
++    private Object value;
++    private String ref;
++
++    public String getName() {
++        return name;
++    }
++
++    public void setName(String name) {
++        this.name = name;
++    }
++
++    public Object getValue() {
++        return value;
++    }
++
++    public void setValue(Object value) {
++        if(this.ref != null){
++            throw new IllegalStateException("A property can only have a value OR a reference, not both.");
++        }
++        this.value = value;
++    }
++
++    public String getRef() {
++        return ref;
++    }
++
++    public void setRef(String ref) {
++        if(this.value != null){
++            throw new IllegalStateException("A property can only have a value OR a reference, not both.");
++        }
++        this.ref = ref;
++    }
++
++    public boolean isReference(){
++        return this.ref != null;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
index 0000000,0000000..277c601
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
@@@ -1,0 -1,0 +1,24 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++/**
++ * Bean representation of a Storm spout.
++ */
++public class SpoutDef extends VertexDef {
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
index 0000000,0000000..da80f1c
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
@@@ -1,0 -1,0 +1,64 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++/**
++ * Represents a stream of tuples from one Storm component (Spout or Bolt) to another (an edge in the topology DAG).
++ *
++ * Required fields are `from` and `to`, which define the source and destination, and the stream `grouping`.
++ *
++ */
++public class StreamDef {
++
++    private String name; // not used, placeholder for GUI, etc.
++    private String from;
++    private String to;
++    private GroupingDef grouping;
++
++    public String getTo() {
++        return to;
++    }
++
++    public void setTo(String to) {
++        this.to = to;
++    }
++
++    public String getName() {
++        return name;
++    }
++
++    public void setName(String name) {
++        this.name = name;
++    }
++
++    public String getFrom() {
++        return from;
++    }
++
++    public void setFrom(String from) {
++        this.from = from;
++    }
++
++    public GroupingDef getGrouping() {
++        return grouping;
++    }
++
++    public void setGrouping(GroupingDef grouping) {
++        this.grouping = grouping;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
index 0000000,0000000..a6ae450
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
@@@ -1,0 -1,0 +1,216 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.util.*;
++
++/**
++ * Bean represenation of a topology.
++ *
++ * It consists of the following:
++ *   1. The topology name
++ *   2. A `java.util.Map` representing the `backtype.storm.config` for the topology
++ *   3. A list of spout definitions
++ *   4. A list of bolt definitions
++ *   5. A list of stream definitions that define the flow between spouts and bolts.
++ *
++ */
++public class TopologyDef {
++    private static Logger LOG = LoggerFactory.getLogger(TopologyDef.class);
++
++    private String name;
++    private Map<String, BeanDef> componentMap = new LinkedHashMap<String, BeanDef>(); // not required
++    private List<IncludeDef> includes; // not required
++    private Map<String, Object> config = new HashMap<String, Object>();
++
++    // a "topology source" is a class that can produce a `StormTopology` thrift object.
++    private TopologySourceDef topologySource;
++
++    // the following are required if we're defining a core storm topology DAG in YAML, etc.
++    private Map<String, BoltDef> boltMap = new LinkedHashMap<String, BoltDef>();
++    private Map<String, SpoutDef> spoutMap = new LinkedHashMap<String, SpoutDef>();
++    private List<StreamDef> streams = new ArrayList<StreamDef>();
++
++
++    public String getName() {
++        return name;
++    }
++
++    public void setName(String name) {
++        this.name = name;
++    }
++
++    public void setName(String name, boolean override){
++        if(this.name == null || override){
++            this.name = name;
++        } else {
++            LOG.warn("Ignoring attempt to set property 'name' with override == false.");
++        }
++    }
++
++    public List<SpoutDef> getSpouts() {
++        ArrayList<SpoutDef> retval = new ArrayList<SpoutDef>();
++        retval.addAll(this.spoutMap.values());
++        return retval;
++    }
++
++    public void setSpouts(List<SpoutDef> spouts) {
++        this.spoutMap = new LinkedHashMap<String, SpoutDef>();
++        for(SpoutDef spout : spouts){
++            this.spoutMap.put(spout.getId(), spout);
++        }
++    }
++
++    public List<BoltDef> getBolts() {
++        ArrayList<BoltDef> retval = new ArrayList<BoltDef>();
++        retval.addAll(this.boltMap.values());
++        return retval;
++    }
++
++    public void setBolts(List<BoltDef> bolts) {
++        this.boltMap = new LinkedHashMap<String, BoltDef>();
++        for(BoltDef bolt : bolts){
++            this.boltMap.put(bolt.getId(), bolt);
++        }
++    }
++
++    public List<StreamDef> getStreams() {
++        return streams;
++    }
++
++    public void setStreams(List<StreamDef> streams) {
++        this.streams = streams;
++    }
++
++    public Map<String, Object> getConfig() {
++        return config;
++    }
++
++    public void setConfig(Map<String, Object> config) {
++        this.config = config;
++    }
++
++    public List<BeanDef> getComponents() {
++        ArrayList<BeanDef> retval = new ArrayList<BeanDef>();
++        retval.addAll(this.componentMap.values());
++        return retval;
++    }
++
++    public void setComponents(List<BeanDef> components) {
++        this.componentMap = new LinkedHashMap<String, BeanDef>();
++        for(BeanDef component : components){
++            this.componentMap.put(component.getId(), component);
++        }
++    }
++
++    public List<IncludeDef> getIncludes() {
++        return includes;
++    }
++
++    public void setIncludes(List<IncludeDef> includes) {
++        this.includes = includes;
++    }
++
++    // utility methods
++    public int parallelismForBolt(String boltId){
++        return this.boltMap.get(boltId).getParallelism();
++    }
++
++    public BoltDef getBoltDef(String id){
++        return this.boltMap.get(id);
++    }
++
++    public SpoutDef getSpoutDef(String id){
++        return this.spoutMap.get(id);
++    }
++
++    public BeanDef getComponent(String id){
++        return this.componentMap.get(id);
++    }
++
++    // used by includes implementation
++    public void addAllBolts(List<BoltDef> bolts, boolean override){
++        for(BoltDef bolt : bolts){
++            String id = bolt.getId();
++            if(this.boltMap.get(id) == null || override) {
++                this.boltMap.put(bolt.getId(), bolt);
++            } else {
++                LOG.warn("Ignoring attempt to create bolt '{}' with override == false.", id);
++            }
++        }
++    }
++
++    public void addAllSpouts(List<SpoutDef> spouts, boolean override){
++        for(SpoutDef spout : spouts){
++            String id = spout.getId();
++            if(this.spoutMap.get(id) == null || override) {
++                this.spoutMap.put(spout.getId(), spout);
++            } else {
++                LOG.warn("Ignoring attempt to create spout '{}' with override == false.", id);
++            }
++        }
++    }
++
++    public void addAllComponents(List<BeanDef> components, boolean override) {
++        for(BeanDef bean : components){
++            String id = bean.getId();
++            if(this.componentMap.get(id) == null || override) {
++                this.componentMap.put(bean.getId(), bean);
++            } else {
++                LOG.warn("Ignoring attempt to create component '{}' with override == false.", id);
++            }
++        }
++    }
++
++    public void addAllStreams(List<StreamDef> streams, boolean override) {
++        //TODO figure out how we want to deal with overrides. Users may want to add streams even when overriding other
++        // properties. For now we just add them blindly which could lead to a potentially invalid topology.
++        this.streams.addAll(streams);
++    }
++
++    public TopologySourceDef getTopologySource() {
++        return topologySource;
++    }
++
++    public void setTopologySource(TopologySourceDef topologySource) {
++        this.topologySource = topologySource;
++    }
++
++    public boolean isDslTopology(){
++        return this.topologySource == null;
++    }
++
++
++    public boolean validate(){
++        boolean hasSpouts = this.spoutMap != null && this.spoutMap.size() > 0;
++        boolean hasBolts = this.boltMap != null && this.boltMap.size() > 0;
++        boolean hasStreams = this.streams != null && this.streams.size() > 0;
++        boolean hasSpoutsBoltsStreams = hasStreams && hasBolts && hasSpouts;
++        // you cant define a topologySource and a DSL topology at the same time...
++        if (!isDslTopology() && ((hasSpouts || hasBolts || hasStreams))) {
++            return false;
++        }
++        if(isDslTopology() && (hasSpouts && hasBolts && hasStreams)) {
++            return true;
++        }
++        return true;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
index 0000000,0000000..d6a2f57
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
@@@ -1,0 -1,0 +1,36 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++public class TopologySourceDef extends ObjectDef {
++    public static final String DEFAULT_METHOD_NAME = "getTopology";
++
++    private String methodName;
++
++    public TopologySourceDef(){
++        this.methodName = DEFAULT_METHOD_NAME;
++    }
++
++    public String getMethodName() {
++        return methodName;
++    }
++
++    public void setMethodName(String methodName) {
++        this.methodName = methodName;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
index 0000000,0000000..e71bcc2
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
@@@ -1,0 -1,0 +1,36 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++/**
++ * Abstract parent class of component definitions
++ * (spouts/bolts)
++ */
++public abstract class VertexDef extends BeanDef {
++
++    // default parallelism to 1 so if it's ommitted, the topology will still function.
++    private int parallelism = 1;
++
++    public int getParallelism() {
++        return parallelism;
++    }
++
++    public void setParallelism(int parallelism) {
++        this.parallelism = parallelism;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index 0000000,0000000..72f8a8e
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@@ -1,0 -1,0 +1,202 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.parser;
++
++import org.apache.storm.flux.api.TopologySource;
++import org.apache.storm.flux.model.BoltDef;
++import org.apache.storm.flux.model.IncludeDef;
++import org.apache.storm.flux.model.SpoutDef;
++import org.apache.storm.flux.model.TopologyDef;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import org.yaml.snakeyaml.TypeDescription;
++import org.yaml.snakeyaml.Yaml;
++import org.yaml.snakeyaml.constructor.Constructor;
++
++import java.io.ByteArrayOutputStream;
++import java.io.FileInputStream;
++import java.io.IOException;
++import java.io.InputStream;
++import java.nio.ByteBuffer;
++import java.util.Map;
++import java.util.Properties;
++
++public class FluxParser {
++    private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class);
++
++    private FluxParser(){}
++
++    // TODO refactor input stream processing (see parseResource() method).
++    public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes,
++                                        String propertiesFile, boolean envSub) throws IOException {
++        Yaml yaml = yaml();
++        FileInputStream in = new FileInputStream(inputFile);
++        // TODO process properties, etc.
++        TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub);
++        in.close();
++        if(dumpYaml){
++            dumpYaml(topology, yaml);
++        }
++        if(processIncludes) {
++            return processIncludes(yaml, topology, propertiesFile, envSub);
++        } else {
++            return topology;
++        }
++    }
++
++    public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes,
++                                            String propertiesFile, boolean envSub) throws IOException {
++        Yaml yaml = yaml();
++        InputStream in = FluxParser.class.getResourceAsStream(resource);
++        if(in == null){
++            LOG.error("Unable to load classpath resource: " + resource);
++            System.exit(1);
++        }
++        TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub);
++        in.close();
++        if(dumpYaml){
++            dumpYaml(topology, yaml);
++        }
++        if(processIncludes) {
++            return processIncludes(yaml, topology, propertiesFile, envSub);
++        } else {
++            return topology;
++        }
++    }
++
++    private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile, boolean envSubstitution) throws IOException {
++        ByteArrayOutputStream bos = new ByteArrayOutputStream();
++        LOG.info("loading YAML from input stream...");
++        int b = -1;
++        while((b = in.read()) != -1){
++            bos.write(b);
++        }
++
++        // TODO substitution implementation is not exactly efficient or kind to memory...
++        String str = bos.toString();
++        // properties file substitution
++        if(propsFile != null){
++            LOG.info("Performing property substitution.");
++            InputStream propsIn = new FileInputStream(propsFile);
++            Properties props = new Properties();
++            props.load(propsIn);
++            for(Object key : props.keySet()){
++                str = str.replace("${" + key + "}", props.getProperty((String)key));
++            }
++        } else {
++            LOG.info("Not performing property substitution.");
++        }
++
++        // environment variable substitution
++        if(envSubstitution){
++            LOG.info("Performing environment variable substitution...");
++            Map<String, String> envs = System.getenv();
++            for(String key : envs.keySet()){
++                str = str.replace("${ENV-" + key + "}", envs.get(key));
++            }
++        } else {
++            LOG.info("Not performing environment variable substitution.");
++        }
++        return (TopologyDef)yaml.load(str);
++    }
++
++    private static void dumpYaml(TopologyDef topology, Yaml yaml){
++        System.out.println("Configuration (interpreted): \n" + yaml.dump(topology));
++    }
++
++    private static Yaml yaml(){
++        Constructor constructor = new Constructor(TopologyDef.class);
++
++        TypeDescription topologyDescription = new TypeDescription(TopologyDef.class);
++        topologyDescription.putListPropertyType("spouts", SpoutDef.class);
++        topologyDescription.putListPropertyType("bolts", BoltDef.class);
++        topologyDescription.putListPropertyType("includes", IncludeDef.class);
++        constructor.addTypeDescription(topologyDescription);
++
++        Yaml  yaml = new Yaml(constructor);
++        return yaml;
++    }
++
++    /**
++     *
++     * @param yaml the yaml parser for parsing the include file(s)
++     * @param topologyDef the topology definition containing (possibly zero) includes
++     * @return The TopologyDef with includes resolved.
++     */
++    private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile, boolean envSub)
++            throws IOException {
++        //TODO support multiple levels of includes
++        if(topologyDef.getIncludes() != null) {
++            for (IncludeDef include : topologyDef.getIncludes()){
++                TopologyDef includeTopologyDef = null;
++                if (include.isResource()) {
++                    LOG.info("Loading includes from resource: {}", include.getFile());
++                    includeTopologyDef = parseResource(include.getFile(), true, false, propsFile, envSub);
++                } else {
++                    LOG.info("Loading includes from file: {}", include.getFile());
++                    includeTopologyDef = parseFile(include.getFile(), true, false, propsFile, envSub);
++                }
++
++                // if overrides are disabled, we won't replace anything that already exists
++                boolean override = include.isOverride();
++                // name
++                if(includeTopologyDef.getName() != null){
++                    topologyDef.setName(includeTopologyDef.getName(), override);
++                }
++
++                // config
++                if(includeTopologyDef.getConfig() != null) {
++                    //TODO move this logic to the model class
++                    Map<String, Object> config = topologyDef.getConfig();
++                    Map<String, Object> includeConfig = includeTopologyDef.getConfig();
++                    if(override) {
++                        config.putAll(includeTopologyDef.getConfig());
++                    } else {
++                        for(String key : includeConfig.keySet()){
++                            if(config.containsKey(key)){
++                                LOG.warn("Ignoring attempt to set topology config property '{}' with override == false", key);
++                            }
++                            else {
++                                config.put(key, includeConfig.get(key));
++                            }
++                        }
++                    }
++                }
++
++                //component overrides
++                if(includeTopologyDef.getComponents() != null){
++                    topologyDef.addAllComponents(includeTopologyDef.getComponents(), override);
++                }
++                //bolt overrides
++                if(includeTopologyDef.getBolts() != null){
++                    topologyDef.addAllBolts(includeTopologyDef.getBolts(), override);
++                }
++                //spout overrides
++                if(includeTopologyDef.getSpouts() != null) {
++                    topologyDef.addAllSpouts(includeTopologyDef.getSpouts(), override);
++                }
++                //stream overrides
++                //TODO streams should be uniquely identifiable
++                if(includeTopologyDef.getStreams() != null) {
++                    topologyDef.addAllStreams(includeTopologyDef.getStreams(), override);
++                }
++            } // end include processing
++        }
++        return topologyDef;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/resources/splash.txt
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/resources/splash.txt
index 0000000,0000000..337931a
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/resources/splash.txt
@@@ -1,0 -1,0 +1,9 @@@
++███████╗██╗     ██╗   ██╗██╗  ██╗
++██╔════╝██║     ██║   ██║╚██╗██╔╝
++█████╗  ██║     ██║   ██║ ╚███╔╝
++██╔══╝  ██║     ██║   ██║ ██╔██╗
++██║     ███████╗╚██████╔╝██╔╝ ██╗
++╚═╝     ╚══════╝ ╚═════╝ ╚═╝  ╚═╝
+++-         Apache Storm        -+
+++-  data FLow User eXperience  -+
++Version: ${project.version}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
index 0000000,0000000..ff67867
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
@@@ -1,0 -1,0 +1,31 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux;
++
++import org.junit.Test;
++import static org.junit.Assert.*;
++
++public class FluxBuilderTest {
++
++    @Test
++    public void testIsPrimitiveNumber() throws Exception {
++        assertTrue(FluxBuilder.isPrimitiveNumber(int.class));
++        assertFalse(FluxBuilder.isPrimitiveNumber(boolean.class));
++        assertFalse(FluxBuilder.isPrimitiveNumber(String.class));
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
index 0000000,0000000..5e17f5e
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
@@@ -1,0 -1,0 +1,41 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux;
++
++import org.junit.Test;
++
++public class IntegrationTest {
++
++    private static boolean skipTest = true;
++
++    static {
++        String skipStr = System.getProperty("skipIntegration");
++        if(skipStr != null && skipStr.equalsIgnoreCase("false")){
++            skipTest = false;
++        }
++    }
++
++
++
++    @Test
++    public void testRunTopologySource() throws Exception {
++        if(!skipTest) {
++            Flux.main(new String[]{"-s", "30000", "src/test/resources/configs/existing-topology.yaml"});
++        }
++    }
++}