You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:23 UTC

[20/23] storm git commit: STORM-2453 Move non-connectors into the top directory

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
deleted file mode 100644
index e79dfb7..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
+++ /dev/null
@@ -1,630 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux;
-
-import org.apache.storm.Config;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.grouping.CustomStreamGrouping;
-import org.apache.storm.topology.*;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.flux.model.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.*;
-import java.util.*;
-
-public class FluxBuilder {
-    private static Logger LOG = LoggerFactory.getLogger(FluxBuilder.class);
-
-    /**
-     * Given a topology definition, return a populated `org.apache.storm.Config` instance.
-     *
-     * @param topologyDef
-     * @return
-     */
-    public static Config buildConfig(TopologyDef topologyDef) {
-        // merge contents of `config` into topology config
-        Config conf = new Config();
-        conf.putAll(topologyDef.getConfig());
-        return conf;
-    }
-
-    /**
-     * Given a topology definition, return a Storm topology that can be run either locally or remotely.
-     *
-     * @param context
-     * @return
-     * @throws IllegalAccessException
-     * @throws InstantiationException
-     * @throws ClassNotFoundException
-     * @throws NoSuchMethodException
-     * @throws InvocationTargetException
-     */
-    public static StormTopology buildTopology(ExecutionContext context) throws IllegalAccessException,
-            InstantiationException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
-
-        StormTopology topology = null;
-        TopologyDef topologyDef = context.getTopologyDef();
-
-        if(!topologyDef.validate()){
-            throw new IllegalArgumentException("Invalid topology config. Spouts, bolts and streams cannot be " +
-                    "defined in the same configuration as a topologySource.");
-        }
-
-        // build components that may be referenced by spouts, bolts, etc.
-        // the map will be a String --> Object where the object is a fully
-        // constructed class instance
-        buildComponents(context);
-
-        if(topologyDef.isDslTopology()) {
-            // This is a DSL (YAML, etc.) topology...
-            LOG.info("Detected DSL topology...");
-
-            TopologyBuilder builder = new TopologyBuilder();
-
-            // create spouts
-            buildSpouts(context, builder);
-
-            // we need to be able to lookup bolts by id, then switch based
-            // on whether they are IBasicBolt or IRichBolt instances
-            buildBolts(context);
-
-            // process stream definitions
-            buildStreamDefinitions(context, builder);
-
-            topology = builder.createTopology();
-        } else {
-            // user class supplied...
-            // this also provides a bridge to Trident...
-            LOG.info("A topology source has been specified...");
-            ObjectDef def = topologyDef.getTopologySource();
-            topology = buildExternalTopology(def, context);
-        }
-        return topology;
-    }
-
-    /**
-     * Given a `java.lang.Object` instance and a method name, attempt to find a method that matches the input
-     * parameter: `java.util.Map` or `org.apache.storm.Config`.
-     *
-     * @param topologySource object to inspect for the specified method
-     * @param methodName name of the method to look for
-     * @return
-     * @throws NoSuchMethodException
-     */
-    private static Method findGetTopologyMethod(Object topologySource, String methodName) throws NoSuchMethodException {
-        Class clazz = topologySource.getClass();
-        Method[] methods =  clazz.getMethods();
-        ArrayList<Method> candidates = new ArrayList<Method>();
-        for(Method method : methods){
-            if(!method.getName().equals(methodName)){
-                continue;
-            }
-            if(!method.getReturnType().equals(StormTopology.class)){
-                continue;
-            }
-            Class[] paramTypes = method.getParameterTypes();
-            if(paramTypes.length != 1){
-                continue;
-            }
-            if(paramTypes[0].isAssignableFrom(Map.class) || paramTypes[0].isAssignableFrom(Config.class)){
-                candidates.add(method);
-            }
-        }
-
-        if(candidates.size() == 0){
-            throw new IllegalArgumentException("Unable to find method '" + methodName + "' method in class: " + clazz.getName());
-        } else if (candidates.size() > 1){
-            LOG.warn("Found multiple candidate methods in class '" + clazz.getName() + "'. Using the first one found");
-        }
-
-        return candidates.get(0);
-    }
-
-    /**
-     * @param context
-     * @param builder
-     */
-    private static void buildStreamDefinitions(ExecutionContext context, TopologyBuilder builder)
-            throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException,
-            IllegalAccessException, NoSuchFieldException {
-        TopologyDef topologyDef = context.getTopologyDef();
-        // process stream definitions
-        HashMap<String, BoltDeclarer> declarers = new HashMap<String, BoltDeclarer>();
-        for (StreamDef stream : topologyDef.getStreams()) {
-            Object boltObj = context.getBolt(stream.getTo());
-            BoltDeclarer declarer = declarers.get(stream.getTo());
-            if (boltObj instanceof IRichBolt) {
-                if(declarer == null) {
-                    declarer = builder.setBolt(stream.getTo(),
-                            (IRichBolt) boltObj,
-                            topologyDef.parallelismForBolt(stream.getTo()));
-                    declarers.put(stream.getTo(), declarer);
-                }
-            } else if (boltObj instanceof IBasicBolt) {
-                if(declarer == null) {
-                    declarer = builder.setBolt(
-                            stream.getTo(),
-                            (IBasicBolt) boltObj,
-                            topologyDef.parallelismForBolt(stream.getTo()));
-                    declarers.put(stream.getTo(), declarer);
-                }
-            } else if (boltObj instanceof IWindowedBolt) {
-                if(declarer == null) {
-                    declarer = builder.setBolt(
-                            stream.getTo(),
-                            (IWindowedBolt) boltObj,
-                            topologyDef.parallelismForBolt(stream.getTo()));
-                    declarers.put(stream.getTo(), declarer);
-                }
-            } else if (boltObj instanceof IStatefulBolt) {
-                if(declarer == null) {
-                    declarer = builder.setBolt(
-                            stream.getTo(),
-                            (IStatefulBolt) boltObj,
-                            topologyDef.parallelismForBolt(stream.getTo()));
-                    declarers.put(stream.getTo(), declarer);
-                }
-            } else {
-                throw new IllegalArgumentException("Class does not appear to be a bolt: " +
-                        boltObj.getClass().getName());
-            }
-
-            GroupingDef grouping = stream.getGrouping();
-            // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream
-            String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId());
-
-
-            switch (grouping.getType()) {
-                case SHUFFLE:
-                    declarer.shuffleGrouping(stream.getFrom(), streamId);
-                    break;
-                case FIELDS:
-                    //TODO check for null grouping args
-                    declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(grouping.getArgs()));
-                    break;
-                case ALL:
-                    declarer.allGrouping(stream.getFrom(), streamId);
-                    break;
-                case DIRECT:
-                    declarer.directGrouping(stream.getFrom(), streamId);
-                    break;
-                case GLOBAL:
-                    declarer.globalGrouping(stream.getFrom(), streamId);
-                    break;
-                case LOCAL_OR_SHUFFLE:
-                    declarer.localOrShuffleGrouping(stream.getFrom(), streamId);
-                    break;
-                case NONE:
-                    declarer.noneGrouping(stream.getFrom(), streamId);
-                    break;
-                case CUSTOM:
-                    declarer.customGrouping(stream.getFrom(), streamId,
-                            buildCustomStreamGrouping(stream.getGrouping().getCustomClass(), context));
-                    break;
-                default:
-                    throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
-            }
-        }
-    }
-
-    private static void applyProperties(ObjectDef bean, Object instance, ExecutionContext context) throws
-            IllegalAccessException, InvocationTargetException, NoSuchFieldException {
-        List<PropertyDef> props = bean.getProperties();
-        Class clazz = instance.getClass();
-        if (props != null) {
-            for (PropertyDef prop : props) {
-                Object value = prop.isReference() ? context.getComponent(prop.getRef()) : prop.getValue();
-                Method setter = findSetter(clazz, prop.getName(), value);
-                if (setter != null) {
-                    LOG.debug("found setter, attempting to invoke");
-                    // invoke setter
-                    setter.invoke(instance, new Object[]{value});
-                } else {
-                    // look for a public instance variable
-                    LOG.debug("no setter found. Looking for a public instance variable...");
-                    Field field = findPublicField(clazz, prop.getName(), value);
-                    if (field != null) {
-                        field.set(instance, value);
-                    }
-                }
-            }
-        }
-    }
-
-    private static Field findPublicField(Class clazz, String property, Object arg) throws NoSuchFieldException {
-        Field field = clazz.getField(property);
-        return field;
-    }
-
-    private static Method findSetter(Class clazz, String property, Object arg) {
-        String setterName = toSetterName(property);
-        Method retval = null;
-        Method[] methods = clazz.getMethods();
-        for (Method method : methods) {
-            if (setterName.equals(method.getName())) {
-                LOG.debug("Found setter method: " + method.getName());
-                retval = method;
-            }
-        }
-        return retval;
-    }
-
-    private static String toSetterName(String name) {
-        return "set" + name.substring(0, 1).toUpperCase() + name.substring(1, name.length());
-    }
-
-    private static List<Object> resolveReferences(List<Object> args, ExecutionContext context) {
-        LOG.debug("Checking arguments for references.");
-        List<Object> cArgs = new ArrayList<Object>();
-        // resolve references
-        for (Object arg : args) {
-            if (arg instanceof BeanReference) {
-                cArgs.add(context.getComponent(((BeanReference) arg).getId()));
-            } else if (arg instanceof BeanListReference) {
-                List<Object> components = new ArrayList<>();
-                BeanListReference ref = (BeanListReference) arg;
-                for (String id : ref.getIds()) {
-                    components.add(context.getComponent(id));
-                }
-
-                LOG.debug("BeanListReference resolved as {}", components);
-                cArgs.add(components);
-            } else {
-                cArgs.add(arg);
-            }
-        }
-        return cArgs;
-    }
-
-    private static Object buildObject(ObjectDef def, ExecutionContext context) throws ClassNotFoundException,
-            IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
-        Class clazz = Class.forName(def.getClassName());
-        Object obj = null;
-        if (def.hasConstructorArgs()) {
-            LOG.debug("Found constructor arguments in definition: " + def.getConstructorArgs().getClass().getName());
-            List<Object> cArgs = def.getConstructorArgs();
-            if(def.hasReferences()){
-                cArgs = resolveReferences(cArgs, context);
-            }
-            Constructor con = findCompatibleConstructor(cArgs, clazz);
-            if (con != null) {
-                LOG.debug("Found something seemingly compatible, attempting invocation...");
-                obj = con.newInstance(getArgsWithListCoercian(cArgs, con.getParameterTypes()));
-            } else {
-                String msg = String.format("Couldn't find a suitable constructor for class '%s' with arguments '%s'.",
-                        clazz.getName(),
-                        cArgs);
-                throw new IllegalArgumentException(msg);
-            }
-        } else {
-            obj = clazz.newInstance();
-        }
-        applyProperties(def, obj, context);
-        invokeConfigMethods(def, obj, context);
-        return obj;
-    }
-
-    private static StormTopology buildExternalTopology(ObjectDef def, ExecutionContext context)
-            throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException,
-            InvocationTargetException, NoSuchFieldException {
-
-        Object topologySource = buildObject(def, context);
-
-        String methodName = context.getTopologyDef().getTopologySource().getMethodName();
-        Method getTopology = findGetTopologyMethod(topologySource, methodName);
-        if(getTopology.getParameterTypes()[0].equals(Config.class)){
-            Config config = new Config();
-            config.putAll(context.getTopologyDef().getConfig());
-            return (StormTopology) getTopology.invoke(topologySource, config);
-        } else {
-            return (StormTopology) getTopology.invoke(topologySource, context.getTopologyDef().getConfig());
-        }
-    }
-
-    private static CustomStreamGrouping buildCustomStreamGrouping(ObjectDef def, ExecutionContext context)
-            throws ClassNotFoundException,
-            IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
-        Object grouping = buildObject(def, context);
-        return (CustomStreamGrouping)grouping;
-    }
-
-    /**
-     * Given a topology definition, resolve and instantiate all components found and return a map
-     * keyed by the component id.
-     */
-    private static void buildComponents(ExecutionContext context) throws ClassNotFoundException, NoSuchMethodException,
-            IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchFieldException {
-        Collection<BeanDef> cDefs = context.getTopologyDef().getComponents();
-        if (cDefs != null) {
-            for (BeanDef bean : cDefs) {
-                Object obj = buildObject(bean, context);
-                context.addComponent(bean.getId(), obj);
-            }
-        }
-    }
-
-
-    private static void buildSpouts(ExecutionContext context, TopologyBuilder builder) throws ClassNotFoundException,
-            NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchFieldException {
-        for (SpoutDef sd : context.getTopologyDef().getSpouts()) {
-            IRichSpout spout = buildSpout(sd, context);
-            builder.setSpout(sd.getId(), spout, sd.getParallelism());
-            context.addSpout(sd.getId(), spout);
-        }
-    }
-
-    /**
-     * Given a spout definition, return a Storm spout implementation by attempting to find a matching constructor
-     * in the given spout class. Perform list to array conversion as necessary.
-     */
-    private static IRichSpout buildSpout(SpoutDef def, ExecutionContext context) throws ClassNotFoundException,
-            IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
-        return (IRichSpout)buildObject(def, context);
-    }
-
-    /**
-     * Given a list of bolt definitions, build a map of Storm bolts with the bolt definition id as the key.
-     * Attempt to coerce the given constructor arguments to a matching bolt constructor as much as possible.
-     */
-    private static void buildBolts(ExecutionContext context) throws ClassNotFoundException, IllegalAccessException,
-            InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
-        for (BoltDef def : context.getTopologyDef().getBolts()) {
-            Class clazz = Class.forName(def.getClassName());
-            Object bolt = buildObject(def, context);
-            context.addBolt(def.getId(), bolt);
-        }
-    }
-
-    /**
-     * Given a list of constructor arguments, and a target class, attempt to find a suitable constructor.
-     *
-     */
-    private static Constructor findCompatibleConstructor(List<Object> args, Class target) throws NoSuchMethodException {
-        Constructor retval = null;
-        int eligibleCount = 0;
-
-        LOG.debug("Target class: {}, constructor args: {}", target.getName(), args);
-        Constructor[] cons = target.getDeclaredConstructors();
-
-        for (Constructor con : cons) {
-            Class[] paramClasses = con.getParameterTypes();
-            if (paramClasses.length == args.size()) {
-                LOG.debug("found constructor with same number of args..");
-                boolean invokable = canInvokeWithArgs(args, con.getParameterTypes());
-                if (invokable) {
-                    retval = con;
-                    eligibleCount++;
-                }
-                LOG.debug("** invokable --> {}", invokable);
-            } else {
-                LOG.debug("Skipping constructor with wrong number of arguments.");
-            }
-        }
-        if (eligibleCount > 1) {
-            LOG.warn("Found multiple invokable constructors for class {}, given arguments {}. Using the last one found.",
-                    target, args);
-        }
-        return retval;
-    }
-
-
-    public static void invokeConfigMethods(ObjectDef bean, Object instance, ExecutionContext context)
-            throws InvocationTargetException, IllegalAccessException {
-
-        List<ConfigMethodDef> methodDefs = bean.getConfigMethods();
-        if(methodDefs == null || methodDefs.size() == 0){
-            return;
-        }
-        Class clazz = instance.getClass();
-        for(ConfigMethodDef methodDef : methodDefs){
-            List<Object> args = methodDef.getArgs();
-            if (args == null){
-                args = new ArrayList();
-            }
-            if(methodDef.hasReferences()){
-                args = resolveReferences(args, context);
-            }
-            String methodName = methodDef.getName();
-            Method method = findCompatibleMethod(args, clazz, methodName);
-            if(method != null) {
-                Object[] methodArgs = getArgsWithListCoercian(args, method.getParameterTypes());
-                method.invoke(instance, methodArgs);
-            } else {
-                String msg = String.format("Unable to find configuration method '%s' in class '%s' with arguments %s.",
-                        new Object[]{methodName, clazz.getName(), args});
-                throw new IllegalArgumentException(msg);
-            }
-        }
-    }
-
-    private static Method findCompatibleMethod(List<Object> args, Class target, String methodName){
-        Method retval = null;
-        int eligibleCount = 0;
-
-        LOG.debug("Target class: {}, methodName: {}, args: {}", target.getName(), methodName, args);
-        Method[] methods = target.getMethods();
-
-        for (Method method : methods) {
-            Class[] paramClasses = method.getParameterTypes();
-            if (paramClasses.length == args.size() && method.getName().equals(methodName)) {
-                LOG.debug("found constructor with same number of args..");
-                boolean invokable = false;
-                if (args.size() == 0){
-                    // it's a method with zero args
-                    invokable = true;
-                } else {
-                    invokable = canInvokeWithArgs(args, method.getParameterTypes());
-                }
-                if (invokable) {
-                    retval = method;
-                    eligibleCount++;
-                }
-                LOG.debug("** invokable --> {}", invokable);
-            } else {
-                LOG.debug("Skipping method with wrong number of arguments.");
-            }
-        }
-        if (eligibleCount > 1) {
-            LOG.warn("Found multiple invokable methods for class {}, method {}, given arguments {}. " +
-                            "Using the last one found.",
-                            new Object[]{target, methodName, args});
-        }
-        return retval;
-    }
-
-    /**
-     * Given a java.util.List of contructor/method arguments, and a list of parameter types, attempt to convert the
-     * list to an java.lang.Object array that can be used to invoke the constructor. If an argument needs
-     * to be coerced from a List to an Array, do so.
-     */
-    private static Object[] getArgsWithListCoercian(List<Object> args, Class[] parameterTypes) {
-//        Class[] parameterTypes = constructor.getParameterTypes();
-        if (parameterTypes.length != args.size()) {
-            throw new IllegalArgumentException("Contructor parameter count does not egual argument size.");
-        }
-        Object[] constructorParams = new Object[args.size()];
-
-        // loop through the arguments, if we hit a list that has to be convered to an array,
-        // perform the conversion
-        for (int i = 0; i < args.size(); i++) {
-            Object obj = args.get(i);
-            Class paramType = parameterTypes[i];
-            Class objectType = obj.getClass();
-            LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.",
-                    paramType, objectType);
-            if (paramType.equals(objectType)) {
-                LOG.debug("They are the same class.");
-                constructorParams[i] = args.get(i);
-                continue;
-            }
-            if (paramType.isAssignableFrom(objectType)) {
-                LOG.debug("Assignment is possible.");
-                constructorParams[i] = args.get(i);
-                continue;
-            }
-            if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)){
-                LOG.debug("Its a primitive boolean.");
-                Boolean bool = (Boolean)args.get(i);
-                constructorParams[i] = bool.booleanValue();
-                continue;
-            }
-            if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){
-                LOG.debug("Its a primitive number.");
-                Number num = (Number)args.get(i);
-                if(paramType == Float.TYPE){
-                    constructorParams[i] = num.floatValue();
-                } else if (paramType == Double.TYPE) {
-                    constructorParams[i] = num.doubleValue();
-                } else if (paramType == Long.TYPE) {
-                    constructorParams[i] = num.longValue();
-                } else if (paramType == Integer.TYPE) {
-                    constructorParams[i] = num.intValue();
-                } else if (paramType == Short.TYPE) {
-                    constructorParams[i] = num.shortValue();
-                } else if (paramType == Byte.TYPE) {
-                    constructorParams[i] = num.byteValue();
-                } else {
-                    constructorParams[i] = args.get(i);
-                }
-                continue;
-            }
-
-            // enum conversion
-            if(paramType.isEnum() && objectType.equals(String.class)){
-                LOG.debug("Yes, will convert a String to enum");
-                constructorParams[i] = Enum.valueOf(paramType, (String)args.get(i));
-                continue;
-            }
-
-            // List to array conversion
-            if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
-                // TODO more collection content type checking
-                LOG.debug("Conversion appears possible...");
-                List list = (List) obj;
-                LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), list.get(0).getClass());
-
-                // create an array of the right type
-                Object newArrayObj = Array.newInstance(paramType.getComponentType(), list.size());
-                for (int j = 0; j < list.size(); j++) {
-                    Array.set(newArrayObj, j, list.get(j));
-
-                }
-                constructorParams[i] = newArrayObj;
-                LOG.debug("After conversion: {}", constructorParams[i]);
-            }
-        }
-        return constructorParams;
-    }
-
-
-    /**
-     * Determine if the given constructor/method parameter types are compatible given arguments List. Consider if
-     * list coercian can make it possible.
-     *
-     * @param args
-     * @param parameterTypes
-     * @return
-     */
-    private static boolean canInvokeWithArgs(List<Object> args, Class[] parameterTypes) {
-        if (parameterTypes.length != args.size()) {
-            LOG.warn("parameter types were the wrong size");
-            return false;
-        }
-
-        for (int i = 0; i < args.size(); i++) {
-            Object obj = args.get(i);
-            if (obj == null) {
-                throw new IllegalArgumentException("argument shouldn't be null - index: " + i);
-            }
-            Class paramType = parameterTypes[i];
-            Class objectType = obj.getClass();
-            LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.",
-                    paramType, objectType);
-            if (paramType.equals(objectType)) {
-                LOG.debug("Yes, they are the same class.");
-            } else if (paramType.isAssignableFrom(objectType)) {
-                LOG.debug("Yes, assignment is possible.");
-            } else if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)){
-                LOG.debug("Yes, assignment is possible.");
-            } else if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){
-                LOG.debug("Yes, assignment is possible.");
-            } else if(paramType.isEnum() && objectType.equals(String.class)){
-                LOG.debug("Yes, will convert a String to enum");
-            } else if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
-                // TODO more collection content type checking
-                LOG.debug("Assignment is possible if we convert a List to an array.");
-                LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), ((List) obj).get(0).getClass());
-            } else {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    public static boolean isPrimitiveNumber(Class clazz){
-        return clazz.isPrimitive() && !clazz.equals(boolean.class);
-    }
-
-    public static boolean isPrimitiveBoolean(Class clazz){
-        return clazz.isPrimitive() && clazz.equals(boolean.class);
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
deleted file mode 100644
index 2777854..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.api;
-
-
-import org.apache.storm.generated.StormTopology;
-
-import java.util.Map;
-
-/**
- * Marker interface for objects that can produce `StormTopology` objects.
- *
- * If a `topology-source` class implements the `getTopology()` method, Flux will
- * call that method. Otherwise, it will introspect the given class and look for a
- * similar method that produces a `StormTopology` instance.
- *
- * Note that it is not strictly necessary for a class to implement this interface.
- * If a class defines a method with a similar signature, Flux should be able to find
- * and invoke it.
- *
- */
-public interface TopologySource {
-    public StormTopology getTopology(Map<String, Object> config);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
deleted file mode 100644
index 72ca5ae..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A representation of a Java object that is uniquely identifyable, and given a className, constructor arguments,
- * and properties, can be instantiated.
- */
-public class BeanDef extends ObjectDef {
-    private String id;
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java
deleted file mode 100644
index 652210c..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import java.util.List;
-
-/**
- * A bean list reference is a list of bean reference.
- */
-public class BeanListReference {
-    public List<String> ids;
-
-    public BeanListReference(){}
-
-    public BeanListReference(List<String> ids){
-        this.ids = ids;
-    }
-
-    public List<String> getIds() {
-        return ids;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
deleted file mode 100644
index bd236f1..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-/**
- * A bean reference is simply a string pointer to another id.
- */
-public class BeanReference {
-    public String id;
-
-    public BeanReference(){}
-
-    public BeanReference(String id){
-        this.id = id;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
deleted file mode 100644
index 362abf1..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-/**
- * Bean representation of a Storm bolt.
- */
-public class BoltDef extends VertexDef {
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
deleted file mode 100644
index 69cabc3..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ConfigMethodDef {
-    private String name;
-    private List<Object> args;
-    private boolean hasReferences = false;
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public List<Object> getArgs() {
-        return args;
-    }
-
-    public void setArgs(List<Object> args) {
-
-        List<Object> newVal = new ArrayList<Object>();
-        for(Object obj : args){
-            if(obj instanceof LinkedHashMap){
-                Map map = (Map)obj;
-                if(map.containsKey("ref") && map.size() == 1){
-                    newVal.add(new BeanReference((String)map.get("ref")));
-                    this.hasReferences = true;
-                } else if (map.containsKey("reflist") && map.size() == 1) {
-                    newVal.add(new BeanListReference((List<String>) map.get("reflist")));
-                    this.hasReferences = true;
-                } else {
-                    newVal.add(obj);
-                }
-            } else {
-                newVal.add(obj);
-            }
-        }
-        this.args = newVal;
-    }
-
-    public boolean hasReferences(){
-        return this.hasReferences;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
deleted file mode 100644
index 1520006..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import org.apache.storm.Config;
-import org.apache.storm.task.IBolt;
-import org.apache.storm.topology.IRichSpout;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Container for all the objects required to instantiate a topology.
- */
-public class ExecutionContext {
-    // parsed Topology definition
-    TopologyDef topologyDef;
-
-    // Storm config
-    private Config config;
-
-    // components
-    private List<Object> compontents;
-    // indexed by id
-    private Map<String, Object> componentMap = new HashMap<String, Object>();
-
-    private Map<String, IRichSpout> spoutMap = new HashMap<String, IRichSpout>();
-
-    private List<IBolt> bolts;
-    private Map<String, Object> boltMap = new HashMap<String, Object>();
-
-    public ExecutionContext(TopologyDef topologyDef, Config config){
-        this.topologyDef = topologyDef;
-        this.config = config;
-    }
-
-    public TopologyDef getTopologyDef(){
-        return this.topologyDef;
-    }
-
-    public void addSpout(String id, IRichSpout spout){
-        this.spoutMap.put(id, spout);
-    }
-
-    public void addBolt(String id, Object bolt){
-        this.boltMap.put(id, bolt);
-    }
-
-    public Object getBolt(String id){
-        return this.boltMap.get(id);
-    }
-
-    public void addComponent(String id, Object value){
-        this.componentMap.put(id, value);
-    }
-
-    public Object getComponent(String id){
-        return this.componentMap.get(id);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
deleted file mode 100644
index e4fac8e..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import java.util.List;
-
-/**
- * Bean representation of a Storm stream grouping.
- */
-public class GroupingDef {
-
-    /**
-     * Types of stream groupings Storm allows
-     */
-    public static enum Type {
-        ALL,
-        CUSTOM,
-        DIRECT,
-        SHUFFLE,
-        LOCAL_OR_SHUFFLE,
-        FIELDS,
-        GLOBAL,
-        NONE
-    }
-
-    private Type type;
-    private String streamId;
-    private List<String> args;
-    private ObjectDef customClass;
-
-    public List<String> getArgs() {
-        return args;
-    }
-
-    public void setArgs(List<String> args) {
-        this.args = args;
-    }
-
-    public Type getType() {
-        return type;
-    }
-
-    public void setType(Type type) {
-        this.type = type;
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public void setStreamId(String streamId) {
-        this.streamId = streamId;
-    }
-
-    public ObjectDef getCustomClass() {
-        return customClass;
-    }
-
-    public void setCustomClass(ObjectDef customClass) {
-        this.customClass = customClass;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
deleted file mode 100644
index 23fd9d2..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-/**
- * Represents an include. Includes can be either a file or a classpath resource.
- *
- * If an include is marked as `override=true` then existing properties will be replaced.
- *
- */
-public class IncludeDef {
-    private boolean resource = false;
-    boolean override = false;
-    private String file;
-
-    public boolean isResource() {
-        return resource;
-    }
-
-    public void setResource(boolean resource) {
-        this.resource = resource;
-    }
-
-    public String getFile() {
-        return file;
-    }
-
-    public void setFile(String file) {
-        this.file = file;
-    }
-
-    public boolean isOverride() {
-        return override;
-    }
-
-    public void setOverride(boolean override) {
-        this.override = override;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
deleted file mode 100644
index 04a7e8a..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import org.apache.storm.Config;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A representation of a Java object that given a className, constructor arguments,
- * and properties, can be instantiated.
- */
-public class ObjectDef {
-    private String className;
-    private List<Object> constructorArgs;
-    private boolean hasReferences;
-    private List<PropertyDef> properties;
-    private List<ConfigMethodDef> configMethods;
-
-    public String getClassName() {
-        return className;
-    }
-
-    public void setClassName(String className) {
-        this.className = className;
-    }
-
-    public List<Object> getConstructorArgs() {
-        return constructorArgs;
-    }
-
-    public void setConstructorArgs(List<Object> constructorArgs) {
-
-        List<Object> newVal = new ArrayList<Object>();
-        for(Object obj : constructorArgs){
-            if(obj instanceof LinkedHashMap){
-                Map map = (Map)obj;
-                if(map.containsKey("ref") && map.size() == 1) {
-                    newVal.add(new BeanReference((String) map.get("ref")));
-                    this.hasReferences = true;
-                } else if (map.containsKey("reflist") && map.size() == 1) {
-                    newVal.add(new BeanListReference((List<String>) map.get("reflist")));
-                    this.hasReferences = true;
-                } else {
-                    newVal.add(obj);
-                }
-            } else {
-                newVal.add(obj);
-            }
-        }
-        this.constructorArgs = newVal;
-    }
-
-    public boolean hasConstructorArgs(){
-        return this.constructorArgs != null && this.constructorArgs.size() > 0;
-    }
-
-    public boolean hasReferences(){
-        return this.hasReferences;
-    }
-
-    public List<PropertyDef> getProperties() {
-        return properties;
-    }
-
-    public void setProperties(List<PropertyDef> properties) {
-        this.properties = properties;
-    }
-
-    public List<ConfigMethodDef> getConfigMethods() {
-        return configMethods;
-    }
-
-    public void setConfigMethods(List<ConfigMethodDef> configMethods) {
-        this.configMethods = configMethods;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
deleted file mode 100644
index f3d7704..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-public class PropertyDef {
-    private String name;
-    private Object value;
-    private String ref;
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public Object getValue() {
-        return value;
-    }
-
-    public void setValue(Object value) {
-        if(this.ref != null){
-            throw new IllegalStateException("A property can only have a value OR a reference, not both.");
-        }
-        this.value = value;
-    }
-
-    public String getRef() {
-        return ref;
-    }
-
-    public void setRef(String ref) {
-        if(this.value != null){
-            throw new IllegalStateException("A property can only have a value OR a reference, not both.");
-        }
-        this.ref = ref;
-    }
-
-    public boolean isReference(){
-        return this.ref != null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
deleted file mode 100644
index 277c601..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-/**
- * Bean representation of a Storm spout.
- */
-public class SpoutDef extends VertexDef {
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
deleted file mode 100644
index da80f1c..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-/**
- * Represents a stream of tuples from one Storm component (Spout or Bolt) to another (an edge in the topology DAG).
- *
- * Required fields are `from` and `to`, which define the source and destination, and the stream `grouping`.
- *
- */
-public class StreamDef {
-
-    private String name; // not used, placeholder for GUI, etc.
-    private String from;
-    private String to;
-    private GroupingDef grouping;
-
-    public String getTo() {
-        return to;
-    }
-
-    public void setTo(String to) {
-        this.to = to;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getFrom() {
-        return from;
-    }
-
-    public void setFrom(String from) {
-        this.from = from;
-    }
-
-    public GroupingDef getGrouping() {
-        return grouping;
-    }
-
-    public void setGrouping(GroupingDef grouping) {
-        this.grouping = grouping;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
deleted file mode 100644
index 86614f1..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * Bean represenation of a topology.
- *
- * It consists of the following:
- *   1. The topology name
- *   2. A `java.util.Map` representing the `org.apache.storm.config` for the topology
- *   3. A list of spout definitions
- *   4. A list of bolt definitions
- *   5. A list of stream definitions that define the flow between spouts and bolts.
- *
- */
-public class TopologyDef {
-    private static Logger LOG = LoggerFactory.getLogger(TopologyDef.class);
-
-    private String name;
-    private Map<String, BeanDef> componentMap = new LinkedHashMap<String, BeanDef>(); // not required
-    private List<IncludeDef> includes; // not required
-    private Map<String, Object> config = new HashMap<String, Object>();
-
-    // a "topology source" is a class that can produce a `StormTopology` thrift object.
-    private TopologySourceDef topologySource;
-
-    // the following are required if we're defining a core storm topology DAG in YAML, etc.
-    private Map<String, BoltDef> boltMap = new LinkedHashMap<String, BoltDef>();
-    private Map<String, SpoutDef> spoutMap = new LinkedHashMap<String, SpoutDef>();
-    private List<StreamDef> streams = new ArrayList<StreamDef>();
-
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public void setName(String name, boolean override){
-        if(this.name == null || override){
-            this.name = name;
-        } else {
-            LOG.warn("Ignoring attempt to set property 'name' with override == false.");
-        }
-    }
-
-    public List<SpoutDef> getSpouts() {
-        ArrayList<SpoutDef> retval = new ArrayList<SpoutDef>();
-        retval.addAll(this.spoutMap.values());
-        return retval;
-    }
-
-    public void setSpouts(List<SpoutDef> spouts) {
-        this.spoutMap = new LinkedHashMap<String, SpoutDef>();
-        for(SpoutDef spout : spouts){
-            this.spoutMap.put(spout.getId(), spout);
-        }
-    }
-
-    public List<BoltDef> getBolts() {
-        ArrayList<BoltDef> retval = new ArrayList<BoltDef>();
-        retval.addAll(this.boltMap.values());
-        return retval;
-    }
-
-    public void setBolts(List<BoltDef> bolts) {
-        this.boltMap = new LinkedHashMap<String, BoltDef>();
-        for(BoltDef bolt : bolts){
-            this.boltMap.put(bolt.getId(), bolt);
-        }
-    }
-
-    public List<StreamDef> getStreams() {
-        return streams;
-    }
-
-    public void setStreams(List<StreamDef> streams) {
-        this.streams = streams;
-    }
-
-    public Map<String, Object> getConfig() {
-        return config;
-    }
-
-    public void setConfig(Map<String, Object> config) {
-        this.config = config;
-    }
-
-    public List<BeanDef> getComponents() {
-        ArrayList<BeanDef> retval = new ArrayList<BeanDef>();
-        retval.addAll(this.componentMap.values());
-        return retval;
-    }
-
-    public void setComponents(List<BeanDef> components) {
-        this.componentMap = new LinkedHashMap<String, BeanDef>();
-        for(BeanDef component : components){
-            this.componentMap.put(component.getId(), component);
-        }
-    }
-
-    public List<IncludeDef> getIncludes() {
-        return includes;
-    }
-
-    public void setIncludes(List<IncludeDef> includes) {
-        this.includes = includes;
-    }
-
-    // utility methods
-    public int parallelismForBolt(String boltId){
-        return this.boltMap.get(boltId).getParallelism();
-    }
-
-    public BoltDef getBoltDef(String id){
-        return this.boltMap.get(id);
-    }
-
-    public SpoutDef getSpoutDef(String id){
-        return this.spoutMap.get(id);
-    }
-
-    public BeanDef getComponent(String id){
-        return this.componentMap.get(id);
-    }
-
-    // used by includes implementation
-    public void addAllBolts(List<BoltDef> bolts, boolean override){
-        for(BoltDef bolt : bolts){
-            String id = bolt.getId();
-            if(this.boltMap.get(id) == null || override) {
-                this.boltMap.put(bolt.getId(), bolt);
-            } else {
-                LOG.warn("Ignoring attempt to create bolt '{}' with override == false.", id);
-            }
-        }
-    }
-
-    public void addAllSpouts(List<SpoutDef> spouts, boolean override){
-        for(SpoutDef spout : spouts){
-            String id = spout.getId();
-            if(this.spoutMap.get(id) == null || override) {
-                this.spoutMap.put(spout.getId(), spout);
-            } else {
-                LOG.warn("Ignoring attempt to create spout '{}' with override == false.", id);
-            }
-        }
-    }
-
-    public void addAllComponents(List<BeanDef> components, boolean override) {
-        for(BeanDef bean : components){
-            String id = bean.getId();
-            if(this.componentMap.get(id) == null || override) {
-                this.componentMap.put(bean.getId(), bean);
-            } else {
-                LOG.warn("Ignoring attempt to create component '{}' with override == false.", id);
-            }
-        }
-    }
-
-    public void addAllStreams(List<StreamDef> streams, boolean override) {
-        //TODO figure out how we want to deal with overrides. Users may want to add streams even when overriding other
-        // properties. For now we just add them blindly which could lead to a potentially invalid topology.
-        this.streams.addAll(streams);
-    }
-
-    public TopologySourceDef getTopologySource() {
-        return topologySource;
-    }
-
-    public void setTopologySource(TopologySourceDef topologySource) {
-        this.topologySource = topologySource;
-    }
-
-    public boolean isDslTopology(){
-        return this.topologySource == null;
-    }
-
-
-    public boolean validate(){
-        boolean hasSpouts = this.spoutMap != null && this.spoutMap.size() > 0;
-        boolean hasBolts = this.boltMap != null && this.boltMap.size() > 0;
-        boolean hasStreams = this.streams != null && this.streams.size() > 0;
-        boolean hasSpoutsBoltsStreams = hasStreams && hasBolts && hasSpouts;
-        // you cant define a topologySource and a DSL topology at the same time...
-        if (!isDslTopology() && ((hasSpouts || hasBolts || hasStreams))) {
-            return false;
-        }
-        if(isDslTopology() && (hasSpouts && hasBolts && hasStreams)) {
-            return true;
-        }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
deleted file mode 100644
index d6a2f57..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-public class TopologySourceDef extends ObjectDef {
-    public static final String DEFAULT_METHOD_NAME = "getTopology";
-
-    private String methodName;
-
-    public TopologySourceDef(){
-        this.methodName = DEFAULT_METHOD_NAME;
-    }
-
-    public String getMethodName() {
-        return methodName;
-    }
-
-    public void setMethodName(String methodName) {
-        this.methodName = methodName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
deleted file mode 100644
index e71bcc2..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-/**
- * Abstract parent class of component definitions
- * (spouts/bolts)
- */
-public abstract class VertexDef extends BeanDef {
-
-    // default parallelism to 1 so if it's ommitted, the topology will still function.
-    private int parallelism = 1;
-
-    public int getParallelism() {
-        return parallelism;
-    }
-
-    public void setParallelism(int parallelism) {
-        this.parallelism = parallelism;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
deleted file mode 100644
index 2a18474..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.parser;
-
-import org.apache.storm.flux.api.TopologySource;
-import org.apache.storm.flux.model.BoltDef;
-import org.apache.storm.flux.model.IncludeDef;
-import org.apache.storm.flux.model.SpoutDef;
-import org.apache.storm.flux.model.TopologyDef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.TypeDescription;
-import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.constructor.Constructor;
-
-import java.io.ByteArrayOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Properties;
-
-public class FluxParser {
-    private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class);
-
-    private FluxParser(){}
-
-    // TODO refactor input stream processing (see parseResource() method).
-    public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes,
-    	String propertiesFile, boolean envSub) throws IOException {
-   
-        FileInputStream in = new FileInputStream(inputFile);
-        TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, propertiesFile, envSub);
-        in.close();
-        
-        return topology;
-    }
-
-    public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes,
-    	String propertiesFile, boolean envSub) throws IOException {
-        
-        InputStream in = FluxParser.class.getResourceAsStream(resource);
-        TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, propertiesFile, envSub);
-        in.close();
-        
-        return topology;
-    }
-    
-    public static TopologyDef parseInputStream(InputStream inputStream, boolean dumpYaml, boolean processIncludes,
-    	String propertiesFile, boolean envSub) throws IOException {
-		
-	Yaml yaml = yaml();
-    	
-	if (inputStream == null) {
-		LOG.error("Unable to load input stream");
-		System.exit(1);
-	}
-		
-	TopologyDef topology = loadYaml(yaml, inputStream, propertiesFile, envSub);
-		
-	if (dumpYaml) {
-		dumpYaml(topology, yaml);
-	}
-	
-	if (processIncludes) {
-		return processIncludes(yaml, topology, propertiesFile, envSub);
-	} else {
-		return topology;
-	}
-    }
-
-    private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile, boolean envSubstitution) throws IOException {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        LOG.info("loading YAML from input stream...");
-        int b = -1;
-        while((b = in.read()) != -1){
-            bos.write(b);
-        }
-
-        // TODO substitution implementation is not exactly efficient or kind to memory...
-        String str = bos.toString();
-        // properties file substitution
-        if(propsFile != null){
-            LOG.info("Performing property substitution.");
-            InputStream propsIn = new FileInputStream(propsFile);
-            Properties props = new Properties();
-            props.load(propsIn);
-            for(Object key : props.keySet()){
-                str = str.replace("${" + key + "}", props.getProperty((String)key));
-            }
-        } else {
-            LOG.info("Not performing property substitution.");
-        }
-
-        // environment variable substitution
-        if(envSubstitution){
-            LOG.info("Performing environment variable substitution...");
-            Map<String, String> envs = System.getenv();
-            for(String key : envs.keySet()){
-                str = str.replace("${ENV-" + key + "}", envs.get(key));
-            }
-        } else {
-            LOG.info("Not performing environment variable substitution.");
-        }
-        return (TopologyDef)yaml.load(str);
-    }
-
-    private static void dumpYaml(TopologyDef topology, Yaml yaml){
-        System.out.println("Configuration (interpreted): \n" + yaml.dump(topology));
-    }
-
-    private static Yaml yaml(){
-        Constructor constructor = new Constructor(TopologyDef.class);
-
-        TypeDescription topologyDescription = new TypeDescription(TopologyDef.class);
-        topologyDescription.putListPropertyType("spouts", SpoutDef.class);
-        topologyDescription.putListPropertyType("bolts", BoltDef.class);
-        topologyDescription.putListPropertyType("includes", IncludeDef.class);
-        constructor.addTypeDescription(topologyDescription);
-
-        Yaml  yaml = new Yaml(constructor);
-        return yaml;
-    }
-
-    /**
-     *
-     * @param yaml the yaml parser for parsing the include file(s)
-     * @param topologyDef the topology definition containing (possibly zero) includes
-     * @return The TopologyDef with includes resolved.
-     */
-    private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile, boolean envSub)
-            throws IOException {
-        //TODO support multiple levels of includes
-        if(topologyDef.getIncludes() != null) {
-            for (IncludeDef include : topologyDef.getIncludes()){
-                TopologyDef includeTopologyDef = null;
-                if (include.isResource()) {
-                    LOG.info("Loading includes from resource: {}", include.getFile());
-                    includeTopologyDef = parseResource(include.getFile(), true, false, propsFile, envSub);
-                } else {
-                    LOG.info("Loading includes from file: {}", include.getFile());
-                    includeTopologyDef = parseFile(include.getFile(), true, false, propsFile, envSub);
-                }
-
-                // if overrides are disabled, we won't replace anything that already exists
-                boolean override = include.isOverride();
-                // name
-                if(includeTopologyDef.getName() != null){
-                    topologyDef.setName(includeTopologyDef.getName(), override);
-                }
-
-                // config
-                if(includeTopologyDef.getConfig() != null) {
-                    //TODO move this logic to the model class
-                    Map<String, Object> config = topologyDef.getConfig();
-                    Map<String, Object> includeConfig = includeTopologyDef.getConfig();
-                    if(override) {
-                        config.putAll(includeTopologyDef.getConfig());
-                    } else {
-                        for(String key : includeConfig.keySet()){
-                            if(config.containsKey(key)){
-                                LOG.warn("Ignoring attempt to set topology config property '{}' with override == false", key);
-                            }
-                            else {
-                                config.put(key, includeConfig.get(key));
-                            }
-                        }
-                    }
-                }
-
-                //component overrides
-                if(includeTopologyDef.getComponents() != null){
-                    topologyDef.addAllComponents(includeTopologyDef.getComponents(), override);
-                }
-                //bolt overrides
-                if(includeTopologyDef.getBolts() != null){
-                    topologyDef.addAllBolts(includeTopologyDef.getBolts(), override);
-                }
-                //spout overrides
-                if(includeTopologyDef.getSpouts() != null) {
-                    topologyDef.addAllSpouts(includeTopologyDef.getSpouts(), override);
-                }
-                //stream overrides
-                //TODO streams should be uniquely identifiable
-                if(includeTopologyDef.getStreams() != null) {
-                    topologyDef.addAllStreams(includeTopologyDef.getStreams(), override);
-                }
-            } // end include processing
-        }
-        return topologyDef;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/resources/splash.txt
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/resources/splash.txt b/external/flux/flux-core/src/main/resources/splash.txt
deleted file mode 100644
index 337931a..0000000
--- a/external/flux/flux-core/src/main/resources/splash.txt
+++ /dev/null
@@ -1,9 +0,0 @@
-\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2557     \u2588\u2588\u2557   \u2588\u2588\u2557\u2588\u2588\u2557  \u2588\u2588\u2557
-\u2588\u2588\u2554\u2550\u2550\u2550\u2550\u255d\u2588\u2588\u2551     \u2588\u2588\u2551   \u2588\u2588\u2551\u255a\u2588\u2588\u2557\u2588\u2588\u2554\u255d
-\u2588\u2588\u2588\u2588\u2588\u2557  \u2588\u2588\u2551     \u2588\u2588\u2551   \u2588\u2588\u2551 \u255a\u2588\u2588\u2588\u2554\u255d
-\u2588\u2588\u2554\u2550\u2550\u255d  \u2588\u2588\u2551     \u2588\u2588\u2551   \u2588\u2588\u2551 \u2588\u2588\u2554\u2588\u2588\u2557
-\u2588\u2588\u2551     \u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u255a\u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d\u2588\u2588\u2554\u255d \u2588\u2588\u2557
-\u255a\u2550\u255d     \u255a\u2550\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u255d  \u255a\u2550\u255d
-+-         Apache Storm        -+
-+-  data FLow User eXperience  -+
-Version: ${project.version}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java b/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
deleted file mode 100644
index ff67867..0000000
--- a/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class FluxBuilderTest {
-
-    @Test
-    public void testIsPrimitiveNumber() throws Exception {
-        assertTrue(FluxBuilder.isPrimitiveNumber(int.class));
-        assertFalse(FluxBuilder.isPrimitiveNumber(boolean.class));
-        assertFalse(FluxBuilder.isPrimitiveNumber(String.class));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java b/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
deleted file mode 100644
index c5807f8..0000000
--- a/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux;
-
-import org.junit.Test;
-
-public class IntegrationTest {
-
-    private static boolean skipTest = true;
-
-    static {
-        String skipStr = System.getProperty("skipIntegration");
-        if(skipStr != null && skipStr.equalsIgnoreCase("false")){
-            skipTest = false;
-        }
-    }
-
-    @Test
-    public void testRunTopologySource() throws Exception {
-        if(!skipTest) {
-            Flux.main(new String[]{"-s", "30000", "src/test/resources/configs/existing-topology.yaml"});
-        }
-    }
-}