You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/04 04:06:07 UTC
[25/50] [abbrv] storm git commit: merge flux into external/flux/
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
index 0000000,0000000..57237b6
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
@@@ -1,0 -1,0 +1,591 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux;
++
++import backtype.storm.Config;
++import backtype.storm.generated.StormTopology;
++import backtype.storm.grouping.CustomStreamGrouping;
++import backtype.storm.topology.*;
++import backtype.storm.tuple.Fields;
++import backtype.storm.utils.Utils;
++import org.apache.storm.flux.api.TopologySource;
++import org.apache.storm.flux.model.*;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.lang.reflect.*;
++import java.util.ArrayList;
++import java.util.Collection;
++import java.util.List;
++import java.util.Map;
++
++public class FluxBuilder {
++ private static Logger LOG = LoggerFactory.getLogger(FluxBuilder.class);
++
++ /**
++ * Given a topology definition, return a populated `backtype.storm.Config` instance.
++ *
++ * @param topologyDef
++ * @return
++ */
++ public static Config buildConfig(TopologyDef topologyDef) {
++ // merge contents of `config` into topology config
++ Config conf = new Config();
++ conf.putAll(topologyDef.getConfig());
++ return conf;
++ }
++
++ /**
++ * Given a topology definition, return a Storm topology that can be run either locally or remotely.
++ *
++ * @param context
++ * @return
++ * @throws IllegalAccessException
++ * @throws InstantiationException
++ * @throws ClassNotFoundException
++ * @throws NoSuchMethodException
++ * @throws InvocationTargetException
++ */
++ static StormTopology buildTopology(ExecutionContext context) throws IllegalAccessException,
++ InstantiationException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException {
++
++ StormTopology topology = null;
++ TopologyDef topologyDef = context.getTopologyDef();
++
++ if(!topologyDef.validate()){
++ throw new IllegalArgumentException("Invalid topology config. Spouts, bolts and streams cannot be " +
++ "defined in the same configuration as a topologySource.");
++ }
++
++ // build components that may be referenced by spouts, bolts, etc.
++ // the map will be a String --> Object where the object is a fully
++ // constructed class instance
++ buildComponents(context);
++
++ if(topologyDef.isDslTopology()) {
++ // This is a DSL (YAML, etc.) topology...
++ LOG.info("Detected DSL topology...");
++
++ TopologyBuilder builder = new TopologyBuilder();
++
++ // create spouts
++ buildSpouts(context, builder);
++
++ // we need to be able to lookup bolts by id, then switch based
++ // on whether they are IBasicBolt or IRichBolt instances
++ buildBolts(context);
++
++ // process stream definitions
++ buildStreamDefinitions(context, builder);
++
++ topology = builder.createTopology();
++ } else {
++ // user class supplied...
++ // this also provides a bridge to Trident...
++ LOG.info("A topology source has been specified...");
++ ObjectDef def = topologyDef.getTopologySource();
++ topology = buildExternalTopology(def, context);
++ }
++ return topology;
++ }
++
++ /**
++ * Given a `java.lang.Object` instance and a method name, attempt to find a method that matches the input
++ * parameter: `java.util.Map` or `backtype.storm.Config`.
++ *
++ * @param topologySource object to inspect for the specified method
++ * @param methodName name of the method to look for
++ * @return
++ * @throws NoSuchMethodException
++ */
++ private static Method findGetTopologyMethod(Object topologySource, String methodName) throws NoSuchMethodException {
++ Class clazz = topologySource.getClass();
++ Method[] methods = clazz.getMethods();
++ ArrayList<Method> candidates = new ArrayList<Method>();
++ for(Method method : methods){
++ if(!method.getName().equals(methodName)){
++ continue;
++ }
++ if(!method.getReturnType().equals(StormTopology.class)){
++ continue;
++ }
++ Class[] paramTypes = method.getParameterTypes();
++ if(paramTypes.length != 1){
++ continue;
++ }
++ if(paramTypes[0].isAssignableFrom(Map.class) || paramTypes[0].isAssignableFrom(Config.class)){
++ candidates.add(method);
++ }
++ }
++
++ if(candidates.size() == 0){
++ throw new IllegalArgumentException("Unable to find method '" + methodName + "' method in class: " + clazz.getName());
++ } else if (candidates.size() > 1){
++ LOG.warn("Found multiple candidate methods in class '" + clazz.getName() + "'. Using the first one found");
++ }
++
++ return candidates.get(0);
++ }
++
++ /**
++ * @param context
++ * @param builder
++ */
++ private static void buildStreamDefinitions(ExecutionContext context, TopologyBuilder builder)
++ throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException,
++ IllegalAccessException {
++ TopologyDef topologyDef = context.getTopologyDef();
++ // process stream definitions
++ for (StreamDef stream : topologyDef.getStreams()) {
++ Object boltObj = context.getBolt(stream.getTo());
++ BoltDeclarer declarer = null;
++ if (boltObj instanceof IRichBolt) {
++ declarer = builder.setBolt(stream.getTo(),
++ (IRichBolt) boltObj,
++ topologyDef.parallelismForBolt(stream.getTo()));
++ } else if (boltObj instanceof IBasicBolt) {
++ declarer = builder.setBolt(
++ stream.getTo(),
++ (IBasicBolt) boltObj,
++ topologyDef.parallelismForBolt(stream.getTo()));
++ } else {
++ throw new IllegalArgumentException("Class does not appear to be a bolt: " +
++ boltObj.getClass().getName());
++ }
++
++ GroupingDef grouping = stream.getGrouping();
++ // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream
++ String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId());
++
++
++ switch (grouping.getType()) {
++ case SHUFFLE:
++ declarer.shuffleGrouping(stream.getFrom(), streamId);
++ break;
++ case FIELDS:
++ //TODO check for null grouping args
++ declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(grouping.getArgs()));
++ break;
++ case ALL:
++ declarer.allGrouping(stream.getFrom(), streamId);
++ break;
++ case DIRECT:
++ declarer.directGrouping(stream.getFrom(), streamId);
++ break;
++ case GLOBAL:
++ declarer.globalGrouping(stream.getFrom(), streamId);
++ break;
++ case LOCAL_OR_SHUFFLE:
++ declarer.localOrShuffleGrouping(stream.getFrom(), streamId);
++ break;
++ case NONE:
++ declarer.noneGrouping(stream.getFrom(), streamId);
++ break;
++ case CUSTOM:
++ declarer.customGrouping(stream.getFrom(), streamId,
++ buildCustomStreamGrouping(stream.getGrouping().getCustomClass(), context));
++ break;
++ default:
++ throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
++ }
++ }
++ }
++
++ private static void applyProperties(ObjectDef bean, Object instance, ExecutionContext context) throws
++ IllegalAccessException, InvocationTargetException {
++ List<PropertyDef> props = bean.getProperties();
++ Class clazz = instance.getClass();
++ if (props != null) {
++ for (PropertyDef prop : props) {
++ Object value = prop.isReference() ? context.getComponent(prop.getRef()) : prop.getValue();
++ Method setter = findSetter(clazz, prop.getName(), value);
++ if (setter != null) {
++ LOG.debug("found setter, attempting to invoke");
++ // invoke setter
++ setter.invoke(instance, new Object[]{value});
++ } else {
++ // look for a public instance variable
++ LOG.debug("no setter found. Looking for a public instance variable...");
++ Field field = findPublicField(clazz, prop.getName(), value);
++ if (field != null) {
++ field.set(instance, value);
++ }
++ }
++ }
++ }
++ }
++
++ private static Field findPublicField(Class clazz, String property, Object arg) {
++ Field field = null;
++ try {
++ field = clazz.getField(property);
++ } catch (NoSuchFieldException e) {
++ LOG.warn("Could not find setter or public variable for property: " + property, e);
++ }
++ return field;
++ }
++
++ private static Method findSetter(Class clazz, String property, Object arg) {
++ String setterName = toSetterName(property);
++ Method retval = null;
++ Method[] methods = clazz.getMethods();
++ for (Method method : methods) {
++ if (setterName.equals(method.getName())) {
++ LOG.debug("Found setter method: " + method.getName());
++ retval = method;
++ }
++ }
++ return retval;
++ }
++
++ private static String toSetterName(String name) {
++ return "set" + name.substring(0, 1).toUpperCase() + name.substring(1, name.length());
++ }
++
++ private static List<Object> resolveReferences(List<Object> args, ExecutionContext context) {
++ LOG.debug("Checking arguments for references.");
++ List<Object> cArgs = new ArrayList<Object>();
++ // resolve references
++ for (Object arg : args) {
++ if (arg instanceof BeanReference) {
++ cArgs.add(context.getComponent(((BeanReference) arg).getId()));
++ } else {
++ cArgs.add(arg);
++ }
++ }
++ return cArgs;
++ }
++
++ private static Object buildObject(ObjectDef def, ExecutionContext context) throws ClassNotFoundException,
++ IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
++ Class clazz = Class.forName(def.getClassName());
++ Object obj = null;
++ if (def.hasConstructorArgs()) {
++ LOG.debug("Found constructor arguments in definition: " + def.getConstructorArgs().getClass().getName());
++ List<Object> cArgs = def.getConstructorArgs();
++ if(def.hasReferences()){
++ cArgs = resolveReferences(cArgs, context);
++ }
++ Constructor con = findCompatibleConstructor(cArgs, clazz);
++ if (con != null) {
++ LOG.debug("Found something seemingly compatible, attempting invocation...");
++ obj = con.newInstance(getArgsWithListCoercian(cArgs, con.getParameterTypes()));
++ } else {
++ String msg = String.format("Couldn't find a suitable constructor for class '%s' with arguments '%s'.",
++ clazz.getName(),
++ cArgs);
++ throw new IllegalArgumentException(msg);
++ }
++ } else {
++ obj = clazz.newInstance();
++ }
++ applyProperties(def, obj, context);
++ invokeConfigMethods(def, obj, context);
++ return obj;
++ }
++
++ private static StormTopology buildExternalTopology(ObjectDef def, ExecutionContext context)
++ throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException,
++ InvocationTargetException {
++
++ Object topologySource = buildObject(def, context);
++
++ String methodName = context.getTopologyDef().getTopologySource().getMethodName();
++ Method getTopology = findGetTopologyMethod(topologySource, methodName);
++ if(getTopology.getParameterTypes()[0].equals(Config.class)){
++ Config config = new Config();
++ config.putAll(context.getTopologyDef().getConfig());
++ return (StormTopology) getTopology.invoke(topologySource, config);
++ } else {
++ return (StormTopology) getTopology.invoke(topologySource, context.getTopologyDef().getConfig());
++ }
++ }
++
++ private static CustomStreamGrouping buildCustomStreamGrouping(ObjectDef def, ExecutionContext context)
++ throws ClassNotFoundException,
++ IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
++ Object grouping = buildObject(def, context);
++ return (CustomStreamGrouping)grouping;
++ }
++
++ /**
++ * Given a topology definition, resolve and instantiate all components found and return a map
++ * keyed by the component id.
++ */
++ private static void buildComponents(ExecutionContext context) throws ClassNotFoundException, NoSuchMethodException,
++ IllegalAccessException, InvocationTargetException, InstantiationException {
++ Collection<BeanDef> cDefs = context.getTopologyDef().getComponents();
++ if (cDefs != null) {
++ for (BeanDef bean : cDefs) {
++ Object obj = buildObject(bean, context);
++ context.addComponent(bean.getId(), obj);
++ }
++ }
++ }
++
++
++ private static void buildSpouts(ExecutionContext context, TopologyBuilder builder) throws ClassNotFoundException,
++ NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
++ for (SpoutDef sd : context.getTopologyDef().getSpouts()) {
++ IRichSpout spout = buildSpout(sd, context);
++ builder.setSpout(sd.getId(), spout, sd.getParallelism());
++ context.addSpout(sd.getId(), spout);
++ }
++ }
++
++ /**
++ * Given a spout definition, return a Storm spout implementation by attempting to find a matching constructor
++ * in the given spout class. Perform list to array conversion as necessary.
++ */
++ private static IRichSpout buildSpout(SpoutDef def, ExecutionContext context) throws ClassNotFoundException,
++ IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
++ return (IRichSpout)buildObject(def, context);
++ }
++
++ /**
++ * Given a list of bolt definitions, build a map of Storm bolts with the bolt definition id as the key.
++ * Attempt to coerce the given constructor arguments to a matching bolt constructor as much as possible.
++ */
++ private static void buildBolts(ExecutionContext context) throws ClassNotFoundException, IllegalAccessException,
++ InstantiationException, NoSuchMethodException, InvocationTargetException {
++ for (BoltDef def : context.getTopologyDef().getBolts()) {
++ Class clazz = Class.forName(def.getClassName());
++ Object bolt = buildObject(def, context);
++ context.addBolt(def.getId(), bolt);
++ }
++ }
++
++ /**
++ * Given a list of constructor arguments, and a target class, attempt to find a suitable constructor.
++ *
++ */
++ private static Constructor findCompatibleConstructor(List<Object> args, Class target) throws NoSuchMethodException {
++ Constructor retval = null;
++ int eligibleCount = 0;
++
++ LOG.debug("Target class: {}", target.getName());
++ Constructor[] cons = target.getDeclaredConstructors();
++
++ for (Constructor con : cons) {
++ Class[] paramClasses = con.getParameterTypes();
++ if (paramClasses.length == args.size()) {
++ LOG.debug("found constructor with same number of args..");
++ boolean invokable = canInvokeWithArgs(args, con.getParameterTypes());
++ if (invokable) {
++ retval = con;
++ eligibleCount++;
++ }
++ LOG.debug("** invokable --> {}", invokable);
++ } else {
++ LOG.debug("Skipping constructor with wrong number of arguments.");
++ }
++ }
++ if (eligibleCount > 1) {
++ LOG.warn("Found multiple invokable constructors for class {}, given arguments {}. Using the last one found.",
++ target, args);
++ }
++ return retval;
++ }
++
++
++ public static void invokeConfigMethods(ObjectDef bean, Object instance, ExecutionContext context)
++ throws InvocationTargetException, IllegalAccessException {
++
++ List<ConfigMethodDef> methodDefs = bean.getConfigMethods();
++ if(methodDefs == null || methodDefs.size() == 0){
++ return;
++ }
++ Class clazz = instance.getClass();
++ for(ConfigMethodDef methodDef : methodDefs){
++ List<Object> args = methodDef.getArgs();
++ if(methodDef.hasReferences()){
++ args = resolveReferences(args, context);
++ }
++ String methodName = methodDef.getName();
++ Method method = findCompatibleMethod(args, clazz, methodName);
++ if(method != null) {
++ Object[] methodArgs = getArgsWithListCoercian(args, method.getParameterTypes());
++ method.invoke(instance, methodArgs);
++ } else {
++ String msg = String.format("Unable to find configuration method '%s' in class '%s' with arguments %s.",
++ new Object[]{methodName, clazz.getName(), args});
++ throw new IllegalArgumentException(msg);
++ }
++ }
++ }
++
++ private static Method findCompatibleMethod(List<Object> args, Class target, String methodName){
++ Method retval = null;
++ int eligibleCount = 0;
++
++ LOG.debug("Target class: {}", target.getName());
++ Method[] methods = target.getMethods();
++
++ for (Method method : methods) {
++ Class[] paramClasses = method.getParameterTypes();
++ if (paramClasses.length == args.size() && method.getName().equals(methodName)) {
++ LOG.debug("found constructor with same number of args..");
++ boolean invokable = canInvokeWithArgs(args, method.getParameterTypes());
++ if (invokable) {
++ retval = method;
++ eligibleCount++;
++ }
++ LOG.debug("** invokable --> {}", invokable);
++ } else {
++ LOG.debug("Skipping method with wrong number of arguments.");
++ }
++ }
++ if (eligibleCount > 1) {
++ LOG.warn("Found multiple invokable methods for class {}, method {}, given arguments {}. " +
++ "Using the last one found.",
++ new Object[]{target, methodName, args});
++ }
++ return retval;
++ }
++
++ /**
++ * Given a java.util.List of contructor/method arguments, and a list of parameter types, attempt to convert the
++ * list to an java.lang.Object array that can be used to invoke the constructor. If an argument needs
++ * to be coerced from a List to an Array, do so.
++ */
++ private static Object[] getArgsWithListCoercian(List<Object> args, Class[] parameterTypes) {
++// Class[] parameterTypes = constructor.getParameterTypes();
++ if (parameterTypes.length != args.size()) {
++ throw new IllegalArgumentException("Contructor parameter count does not egual argument size.");
++ }
++ Object[] constructorParams = new Object[args.size()];
++
++ // loop through the arguments, if we hit a list that has to be convered to an array,
++ // perform the conversion
++ for (int i = 0; i < args.size(); i++) {
++ Object obj = args.get(i);
++ Class paramType = parameterTypes[i];
++ Class objectType = obj.getClass();
++ LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.",
++ paramType, objectType);
++ if (paramType.equals(objectType)) {
++ LOG.debug("They are the same class.");
++ constructorParams[i] = args.get(i);
++ continue;
++ }
++ if (paramType.isAssignableFrom(objectType)) {
++ LOG.debug("Assignment is possible.");
++ constructorParams[i] = args.get(i);
++ continue;
++ }
++ if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){
++ LOG.debug("Its a primitive number.");
++ Number num = (Number)args.get(i);
++ if(paramType == Float.TYPE){
++ constructorParams[i] = num.floatValue();
++ } else if (paramType == Double.TYPE) {
++ constructorParams[i] = num.doubleValue();
++ } else if (paramType == Long.TYPE) {
++ constructorParams[i] = num.longValue();
++ } else if (paramType == Integer.TYPE) {
++ constructorParams[i] = num.intValue();
++ } else if (paramType == Short.TYPE) {
++ constructorParams[i] = num.shortValue();
++ } else if (paramType == Byte.TYPE) {
++ constructorParams[i] = num.byteValue();
++ } else {
++ constructorParams[i] = args.get(i);
++ }
++ continue;
++ }
++
++ // enum conversion
++ if(paramType.isEnum() && objectType.equals(String.class)){
++ LOG.debug("Yes, will convert a String to enum");
++ constructorParams[i] = Enum.valueOf(paramType, (String)args.get(i));
++ continue;
++ }
++
++ // List to array conversion
++ if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
++ // TODO more collection content type checking
++ LOG.debug("Conversion appears possible...");
++ List list = (List) obj;
++ LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), list.get(0).getClass());
++
++ // create an array of the right type
++ Object newArrayObj = Array.newInstance(paramType.getComponentType(), list.size());
++ for (int j = 0; j < list.size(); j++) {
++ Array.set(newArrayObj, j, list.get(j));
++
++ }
++ constructorParams[i] = newArrayObj;
++ LOG.debug("After conversion: {}", constructorParams[i]);
++ }
++ }
++ return constructorParams;
++ }
++
++
++ /**
++ * Determine if the given constructor/method parameter types are compatible given arguments List. Consider if
++ * list coercian can make it possible.
++ *
++ * @param args
++ * @param parameterTypes
++ * @return
++ */
++ private static boolean canInvokeWithArgs(List<Object> args, Class[] parameterTypes) {
++ if (parameterTypes.length != args.size()) {
++ LOG.warn("parameter types were the wrong size");
++ return false;
++ }
++
++ for (int i = 0; i < args.size(); i++) {
++ Object obj = args.get(i);
++ Class paramType = parameterTypes[i];
++ Class objectType = obj.getClass();
++ LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.",
++ paramType, objectType);
++ if (paramType.equals(objectType)) {
++ LOG.debug("Yes, they are the same class.");
++ return true;
++ }
++ if (paramType.isAssignableFrom(objectType)) {
++ LOG.debug("Yes, assignment is possible.");
++ return true;
++ }
++ if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){
++ return true;
++ }
++ if(paramType.isEnum() && objectType.equals(String.class)){
++ LOG.debug("Yes, will convert a String to enum");
++ return true;
++ }
++ if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
++ // TODO more collection content type checking
++ LOG.debug("Assignment is possible if we convert a List to an array.");
++ LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), ((List) obj).get(0).getClass());
++
++ return true;
++ }
++ return false;
++ }
++ return false;
++ }
++
++ public static boolean isPrimitiveNumber(Class clazz){
++ return clazz.isPrimitive() && !clazz.equals(boolean.class);
++ }
++}
++
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
index 0000000,0000000..fbccfb7
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
@@@ -1,0 -1,0 +1,39 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.api;
++
++
++import backtype.storm.generated.StormTopology;
++
++import java.util.Map;
++
++/**
++ * Marker interface for objects that can produce `StormTopology` objects.
++ *
++ * If a `topology-source` class implements the `getTopology()` method, Flux will
++ * call that method. Otherwise, it will introspect the given class and look for a
++ * similar method that produces a `StormTopology` instance.
++ *
++ * Note that it is not strictly necessary for a class to implement this interface.
++ * If a class defines a method with a similar signature, Flux should be able to find
++ * and invoke it.
++ *
++ */
++public interface TopologySource {
++ public StormTopology getTopology(Map<String, Object> config);
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
index 0000000,0000000..72ca5ae
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
@@@ -1,0 -1,0 +1,39 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++import java.util.ArrayList;
++import java.util.LinkedHashMap;
++import java.util.List;
++import java.util.Map;
++
++/**
++ * A representation of a Java object that is uniquely identifyable, and given a className, constructor arguments,
++ * and properties, can be instantiated.
++ */
++public class BeanDef extends ObjectDef {
++ private String id;
++
++ public String getId() {
++ return id;
++ }
++
++ public void setId(String id) {
++ this.id = id;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
index 0000000,0000000..bd236f1
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
@@@ -1,0 -1,0 +1,39 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++/**
++ * A bean reference is simply a string pointer to another id.
++ */
++public class BeanReference {
++ public String id;
++
++ public BeanReference(){}
++
++ public BeanReference(String id){
++ this.id = id;
++ }
++
++ public String getId() {
++ return id;
++ }
++
++ public void setId(String id) {
++ this.id = id;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
index 0000000,0000000..362abf1
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
@@@ -1,0 -1,0 +1,24 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++/**
++ * Bean representation of a Storm bolt.
++ */
++public class BoltDef extends VertexDef {
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
index 0000000,0000000..6f7e4d4
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
@@@ -1,0 -1,0 +1,62 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++import java.util.ArrayList;
++import java.util.LinkedHashMap;
++import java.util.List;
++import java.util.Map;
++
++public class ConfigMethodDef {
++ private String name;
++ private List<Object> args;
++ private boolean hasReferences = false;
++
++ public String getName() {
++ return name;
++ }
++
++ public void setName(String name) {
++ this.name = name;
++ }
++
++ public List<Object> getArgs() {
++ return args;
++ }
++
++ public void setArgs(List<Object> args) {
++
++ List<Object> newVal = new ArrayList<Object>();
++ for(Object obj : args){
++ if(obj instanceof LinkedHashMap){
++ Map map = (Map)obj;
++ if(map.containsKey("ref") && map.size() == 1){
++ newVal.add(new BeanReference((String)map.get("ref")));
++ this.hasReferences = true;
++ }
++ } else {
++ newVal.add(obj);
++ }
++ }
++ this.args = newVal;
++ }
++
++ public boolean hasReferences(){
++ return this.hasReferences;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
index 0000000,0000000..e94b887
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
@@@ -1,0 -1,0 +1,77 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++import backtype.storm.Config;
++import backtype.storm.task.IBolt;
++import backtype.storm.topology.IRichSpout;
++
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++
++/**
++ * Container for all the objects required to instantiate a topology.
++ */
++public class ExecutionContext {
++ // parsed Topology definition
++ TopologyDef topologyDef;
++
++ // Storm config
++ private Config config;
++
++ // components
++ private List<Object> compontents;
++ // indexed by id
++ private Map<String, Object> componentMap = new HashMap<String, Object>();
++
++ private Map<String, IRichSpout> spoutMap = new HashMap<String, IRichSpout>();
++
++ private List<IBolt> bolts;
++ private Map<String, Object> boltMap = new HashMap<String, Object>();
++
++ public ExecutionContext(TopologyDef topologyDef, Config config){
++ this.topologyDef = topologyDef;
++ this.config = config;
++ }
++
++ public TopologyDef getTopologyDef(){
++ return this.topologyDef;
++ }
++
++ public void addSpout(String id, IRichSpout spout){
++ this.spoutMap.put(id, spout);
++ }
++
++ public void addBolt(String id, Object bolt){
++ this.boltMap.put(id, bolt);
++ }
++
++ public Object getBolt(String id){
++ return this.boltMap.get(id);
++ }
++
++ public void addComponent(String id, Object value){
++ this.componentMap.put(id, value);
++ }
++
++ public Object getComponent(String id){
++ return this.componentMap.get(id);
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
index 0000000,0000000..e4fac8e
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
@@@ -1,0 -1,0 +1,77 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++import java.util.List;
++
++/**
++ * Bean representation of a Storm stream grouping.
++ */
++public class GroupingDef {
++
++ /**
++ * Types of stream groupings Storm allows
++ */
++ public static enum Type {
++ ALL,
++ CUSTOM,
++ DIRECT,
++ SHUFFLE,
++ LOCAL_OR_SHUFFLE,
++ FIELDS,
++ GLOBAL,
++ NONE
++ }
++
++ private Type type;
++ private String streamId;
++ private List<String> args;
++ private ObjectDef customClass;
++
++ public List<String> getArgs() {
++ return args;
++ }
++
++ public void setArgs(List<String> args) {
++ this.args = args;
++ }
++
++ public Type getType() {
++ return type;
++ }
++
++ public void setType(Type type) {
++ this.type = type;
++ }
++
++ public String getStreamId() {
++ return streamId;
++ }
++
++ public void setStreamId(String streamId) {
++ this.streamId = streamId;
++ }
++
++ public ObjectDef getCustomClass() {
++ return customClass;
++ }
++
++ public void setCustomClass(ObjectDef customClass) {
++ this.customClass = customClass;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
index 0000000,0000000..23fd9d2
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
@@@ -1,0 -1,0 +1,54 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++/**
++ * Represents an include. Includes can be either a file or a classpath resource.
++ *
++ * If an include is marked as `override=true` then existing properties will be replaced.
++ *
++ */
++public class IncludeDef {
++ private boolean resource = false;
++ boolean override = false;
++ private String file;
++
++ public boolean isResource() {
++ return resource;
++ }
++
++ public void setResource(boolean resource) {
++ this.resource = resource;
++ }
++
++ public String getFile() {
++ return file;
++ }
++
++ public void setFile(String file) {
++ this.file = file;
++ }
++
++ public boolean isOverride() {
++ return override;
++ }
++
++ public void setOverride(boolean override) {
++ this.override = override;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
index 0000000,0000000..7386900
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
@@@ -1,0 -1,0 +1,90 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++import backtype.storm.Config;
++
++import java.util.ArrayList;
++import java.util.LinkedHashMap;
++import java.util.List;
++import java.util.Map;
++
++/**
++ * A representation of a Java object that given a className, constructor arguments,
++ * and properties, can be instantiated.
++ */
++public class ObjectDef {
++ private String className;
++ private List<Object> constructorArgs;
++ private boolean hasReferences;
++ private List<PropertyDef> properties;
++ private List<ConfigMethodDef> configMethods;
++
++ public String getClassName() {
++ return className;
++ }
++
++ public void setClassName(String className) {
++ this.className = className;
++ }
++
++ public List<Object> getConstructorArgs() {
++ return constructorArgs;
++ }
++
++ public void setConstructorArgs(List<Object> constructorArgs) {
++
++ List<Object> newVal = new ArrayList<Object>();
++ for(Object obj : constructorArgs){
++ if(obj instanceof LinkedHashMap){
++ Map map = (Map)obj;
++ if(map.containsKey("ref") && map.size() == 1){
++ newVal.add(new BeanReference((String)map.get("ref")));
++ this.hasReferences = true;
++ }
++ } else {
++ newVal.add(obj);
++ }
++ }
++ this.constructorArgs = newVal;
++ }
++
++ public boolean hasConstructorArgs(){
++ return this.constructorArgs != null && this.constructorArgs.size() > 0;
++ }
++
++ public boolean hasReferences(){
++ return this.hasReferences;
++ }
++
++ public List<PropertyDef> getProperties() {
++ return properties;
++ }
++
++ public void setProperties(List<PropertyDef> properties) {
++ this.properties = properties;
++ }
++
++ public List<ConfigMethodDef> getConfigMethods() {
++ return configMethods;
++ }
++
++ public void setConfigMethods(List<ConfigMethodDef> configMethods) {
++ this.configMethods = configMethods;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
index 0000000,0000000..f3d7704
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
@@@ -1,0 -1,0 +1,58 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++public class PropertyDef {
++ private String name;
++ private Object value;
++ private String ref;
++
++ public String getName() {
++ return name;
++ }
++
++ public void setName(String name) {
++ this.name = name;
++ }
++
++ public Object getValue() {
++ return value;
++ }
++
++ public void setValue(Object value) {
++ if(this.ref != null){
++ throw new IllegalStateException("A property can only have a value OR a reference, not both.");
++ }
++ this.value = value;
++ }
++
++ public String getRef() {
++ return ref;
++ }
++
++ public void setRef(String ref) {
++ if(this.value != null){
++ throw new IllegalStateException("A property can only have a value OR a reference, not both.");
++ }
++ this.ref = ref;
++ }
++
++ public boolean isReference(){
++ return this.ref != null;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
index 0000000,0000000..277c601
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
@@@ -1,0 -1,0 +1,24 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++/**
++ * Bean representation of a Storm spout.
++ */
++public class SpoutDef extends VertexDef {
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
index 0000000,0000000..da80f1c
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
@@@ -1,0 -1,0 +1,64 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++/**
++ * Represents a stream of tuples from one Storm component (Spout or Bolt) to another (an edge in the topology DAG).
++ *
++ * Required fields are `from` and `to`, which define the source and destination, and the stream `grouping`.
++ *
++ */
++public class StreamDef {
++
++ private String name; // not used, placeholder for GUI, etc.
++ private String from;
++ private String to;
++ private GroupingDef grouping;
++
++ public String getTo() {
++ return to;
++ }
++
++ public void setTo(String to) {
++ this.to = to;
++ }
++
++ public String getName() {
++ return name;
++ }
++
++ public void setName(String name) {
++ this.name = name;
++ }
++
++ public String getFrom() {
++ return from;
++ }
++
++ public void setFrom(String from) {
++ this.from = from;
++ }
++
++ public GroupingDef getGrouping() {
++ return grouping;
++ }
++
++ public void setGrouping(GroupingDef grouping) {
++ this.grouping = grouping;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
index 0000000,0000000..a6ae450
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
@@@ -1,0 -1,0 +1,216 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.util.*;
++
++/**
++ * Bean represenation of a topology.
++ *
++ * It consists of the following:
++ * 1. The topology name
++ * 2. A `java.util.Map` representing the `backtype.storm.config` for the topology
++ * 3. A list of spout definitions
++ * 4. A list of bolt definitions
++ * 5. A list of stream definitions that define the flow between spouts and bolts.
++ *
++ */
++public class TopologyDef {
++ private static Logger LOG = LoggerFactory.getLogger(TopologyDef.class);
++
++ private String name;
++ private Map<String, BeanDef> componentMap = new LinkedHashMap<String, BeanDef>(); // not required
++ private List<IncludeDef> includes; // not required
++ private Map<String, Object> config = new HashMap<String, Object>();
++
++ // a "topology source" is a class that can produce a `StormTopology` thrift object.
++ private TopologySourceDef topologySource;
++
++ // the following are required if we're defining a core storm topology DAG in YAML, etc.
++ private Map<String, BoltDef> boltMap = new LinkedHashMap<String, BoltDef>();
++ private Map<String, SpoutDef> spoutMap = new LinkedHashMap<String, SpoutDef>();
++ private List<StreamDef> streams = new ArrayList<StreamDef>();
++
++
++ public String getName() {
++ return name;
++ }
++
++ public void setName(String name) {
++ this.name = name;
++ }
++
++ public void setName(String name, boolean override){
++ if(this.name == null || override){
++ this.name = name;
++ } else {
++ LOG.warn("Ignoring attempt to set property 'name' with override == false.");
++ }
++ }
++
++ public List<SpoutDef> getSpouts() {
++ ArrayList<SpoutDef> retval = new ArrayList<SpoutDef>();
++ retval.addAll(this.spoutMap.values());
++ return retval;
++ }
++
++ public void setSpouts(List<SpoutDef> spouts) {
++ this.spoutMap = new LinkedHashMap<String, SpoutDef>();
++ for(SpoutDef spout : spouts){
++ this.spoutMap.put(spout.getId(), spout);
++ }
++ }
++
++ public List<BoltDef> getBolts() {
++ ArrayList<BoltDef> retval = new ArrayList<BoltDef>();
++ retval.addAll(this.boltMap.values());
++ return retval;
++ }
++
++ public void setBolts(List<BoltDef> bolts) {
++ this.boltMap = new LinkedHashMap<String, BoltDef>();
++ for(BoltDef bolt : bolts){
++ this.boltMap.put(bolt.getId(), bolt);
++ }
++ }
++
++ public List<StreamDef> getStreams() {
++ return streams;
++ }
++
++ public void setStreams(List<StreamDef> streams) {
++ this.streams = streams;
++ }
++
++ public Map<String, Object> getConfig() {
++ return config;
++ }
++
++ public void setConfig(Map<String, Object> config) {
++ this.config = config;
++ }
++
++ public List<BeanDef> getComponents() {
++ ArrayList<BeanDef> retval = new ArrayList<BeanDef>();
++ retval.addAll(this.componentMap.values());
++ return retval;
++ }
++
++ public void setComponents(List<BeanDef> components) {
++ this.componentMap = new LinkedHashMap<String, BeanDef>();
++ for(BeanDef component : components){
++ this.componentMap.put(component.getId(), component);
++ }
++ }
++
++ public List<IncludeDef> getIncludes() {
++ return includes;
++ }
++
++ public void setIncludes(List<IncludeDef> includes) {
++ this.includes = includes;
++ }
++
++ // utility methods
++ public int parallelismForBolt(String boltId){
++ return this.boltMap.get(boltId).getParallelism();
++ }
++
++ public BoltDef getBoltDef(String id){
++ return this.boltMap.get(id);
++ }
++
++ public SpoutDef getSpoutDef(String id){
++ return this.spoutMap.get(id);
++ }
++
++ public BeanDef getComponent(String id){
++ return this.componentMap.get(id);
++ }
++
++ // used by includes implementation
++ public void addAllBolts(List<BoltDef> bolts, boolean override){
++ for(BoltDef bolt : bolts){
++ String id = bolt.getId();
++ if(this.boltMap.get(id) == null || override) {
++ this.boltMap.put(bolt.getId(), bolt);
++ } else {
++ LOG.warn("Ignoring attempt to create bolt '{}' with override == false.", id);
++ }
++ }
++ }
++
++ public void addAllSpouts(List<SpoutDef> spouts, boolean override){
++ for(SpoutDef spout : spouts){
++ String id = spout.getId();
++ if(this.spoutMap.get(id) == null || override) {
++ this.spoutMap.put(spout.getId(), spout);
++ } else {
++ LOG.warn("Ignoring attempt to create spout '{}' with override == false.", id);
++ }
++ }
++ }
++
++ public void addAllComponents(List<BeanDef> components, boolean override) {
++ for(BeanDef bean : components){
++ String id = bean.getId();
++ if(this.componentMap.get(id) == null || override) {
++ this.componentMap.put(bean.getId(), bean);
++ } else {
++ LOG.warn("Ignoring attempt to create component '{}' with override == false.", id);
++ }
++ }
++ }
++
++ public void addAllStreams(List<StreamDef> streams, boolean override) {
++ //TODO figure out how we want to deal with overrides. Users may want to add streams even when overriding other
++ // properties. For now we just add them blindly which could lead to a potentially invalid topology.
++ this.streams.addAll(streams);
++ }
++
++ public TopologySourceDef getTopologySource() {
++ return topologySource;
++ }
++
++ public void setTopologySource(TopologySourceDef topologySource) {
++ this.topologySource = topologySource;
++ }
++
++ public boolean isDslTopology(){
++ return this.topologySource == null;
++ }
++
++
++ public boolean validate(){
++ boolean hasSpouts = this.spoutMap != null && this.spoutMap.size() > 0;
++ boolean hasBolts = this.boltMap != null && this.boltMap.size() > 0;
++ boolean hasStreams = this.streams != null && this.streams.size() > 0;
++ boolean hasSpoutsBoltsStreams = hasStreams && hasBolts && hasSpouts;
++ // you cant define a topologySource and a DSL topology at the same time...
++ if (!isDslTopology() && ((hasSpouts || hasBolts || hasStreams))) {
++ return false;
++ }
++ if(isDslTopology() && (hasSpouts && hasBolts && hasStreams)) {
++ return true;
++ }
++ return true;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
index 0000000,0000000..d6a2f57
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
@@@ -1,0 -1,0 +1,36 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++public class TopologySourceDef extends ObjectDef {
++ public static final String DEFAULT_METHOD_NAME = "getTopology";
++
++ private String methodName;
++
++ public TopologySourceDef(){
++ this.methodName = DEFAULT_METHOD_NAME;
++ }
++
++ public String getMethodName() {
++ return methodName;
++ }
++
++ public void setMethodName(String methodName) {
++ this.methodName = methodName;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
index 0000000,0000000..e71bcc2
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
@@@ -1,0 -1,0 +1,36 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.model;
++
++/**
++ * Abstract parent class of component definitions
++ * (spouts/bolts)
++ */
++public abstract class VertexDef extends BeanDef {
++
++ // default parallelism to 1 so if it's ommitted, the topology will still function.
++ private int parallelism = 1;
++
++ public int getParallelism() {
++ return parallelism;
++ }
++
++ public void setParallelism(int parallelism) {
++ this.parallelism = parallelism;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index 0000000,0000000..72f8a8e
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@@ -1,0 -1,0 +1,202 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.parser;
++
++import org.apache.storm.flux.api.TopologySource;
++import org.apache.storm.flux.model.BoltDef;
++import org.apache.storm.flux.model.IncludeDef;
++import org.apache.storm.flux.model.SpoutDef;
++import org.apache.storm.flux.model.TopologyDef;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import org.yaml.snakeyaml.TypeDescription;
++import org.yaml.snakeyaml.Yaml;
++import org.yaml.snakeyaml.constructor.Constructor;
++
++import java.io.ByteArrayOutputStream;
++import java.io.FileInputStream;
++import java.io.IOException;
++import java.io.InputStream;
++import java.nio.ByteBuffer;
++import java.util.Map;
++import java.util.Properties;
++
++public class FluxParser {
++ private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class);
++
++ private FluxParser(){}
++
++ // TODO refactor input stream processing (see parseResource() method).
++ public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes,
++ String propertiesFile, boolean envSub) throws IOException {
++ Yaml yaml = yaml();
++ FileInputStream in = new FileInputStream(inputFile);
++ // TODO process properties, etc.
++ TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub);
++ in.close();
++ if(dumpYaml){
++ dumpYaml(topology, yaml);
++ }
++ if(processIncludes) {
++ return processIncludes(yaml, topology, propertiesFile, envSub);
++ } else {
++ return topology;
++ }
++ }
++
++ public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes,
++ String propertiesFile, boolean envSub) throws IOException {
++ Yaml yaml = yaml();
++ InputStream in = FluxParser.class.getResourceAsStream(resource);
++ if(in == null){
++ LOG.error("Unable to load classpath resource: " + resource);
++ System.exit(1);
++ }
++ TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub);
++ in.close();
++ if(dumpYaml){
++ dumpYaml(topology, yaml);
++ }
++ if(processIncludes) {
++ return processIncludes(yaml, topology, propertiesFile, envSub);
++ } else {
++ return topology;
++ }
++ }
++
++ private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile, boolean envSubstitution) throws IOException {
++ ByteArrayOutputStream bos = new ByteArrayOutputStream();
++ LOG.info("loading YAML from input stream...");
++ int b = -1;
++ while((b = in.read()) != -1){
++ bos.write(b);
++ }
++
++ // TODO substitution implementation is not exactly efficient or kind to memory...
++ String str = bos.toString();
++ // properties file substitution
++ if(propsFile != null){
++ LOG.info("Performing property substitution.");
++ InputStream propsIn = new FileInputStream(propsFile);
++ Properties props = new Properties();
++ props.load(propsIn);
++ for(Object key : props.keySet()){
++ str = str.replace("${" + key + "}", props.getProperty((String)key));
++ }
++ } else {
++ LOG.info("Not performing property substitution.");
++ }
++
++ // environment variable substitution
++ if(envSubstitution){
++ LOG.info("Performing environment variable substitution...");
++ Map<String, String> envs = System.getenv();
++ for(String key : envs.keySet()){
++ str = str.replace("${ENV-" + key + "}", envs.get(key));
++ }
++ } else {
++ LOG.info("Not performing environment variable substitution.");
++ }
++ return (TopologyDef)yaml.load(str);
++ }
++
++ private static void dumpYaml(TopologyDef topology, Yaml yaml){
++ System.out.println("Configuration (interpreted): \n" + yaml.dump(topology));
++ }
++
++ private static Yaml yaml(){
++ Constructor constructor = new Constructor(TopologyDef.class);
++
++ TypeDescription topologyDescription = new TypeDescription(TopologyDef.class);
++ topologyDescription.putListPropertyType("spouts", SpoutDef.class);
++ topologyDescription.putListPropertyType("bolts", BoltDef.class);
++ topologyDescription.putListPropertyType("includes", IncludeDef.class);
++ constructor.addTypeDescription(topologyDescription);
++
++ Yaml yaml = new Yaml(constructor);
++ return yaml;
++ }
++
++ /**
++ *
++ * @param yaml the yaml parser for parsing the include file(s)
++ * @param topologyDef the topology definition containing (possibly zero) includes
++ * @return The TopologyDef with includes resolved.
++ */
++ private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile, boolean envSub)
++ throws IOException {
++ //TODO support multiple levels of includes
++ if(topologyDef.getIncludes() != null) {
++ for (IncludeDef include : topologyDef.getIncludes()){
++ TopologyDef includeTopologyDef = null;
++ if (include.isResource()) {
++ LOG.info("Loading includes from resource: {}", include.getFile());
++ includeTopologyDef = parseResource(include.getFile(), true, false, propsFile, envSub);
++ } else {
++ LOG.info("Loading includes from file: {}", include.getFile());
++ includeTopologyDef = parseFile(include.getFile(), true, false, propsFile, envSub);
++ }
++
++ // if overrides are disabled, we won't replace anything that already exists
++ boolean override = include.isOverride();
++ // name
++ if(includeTopologyDef.getName() != null){
++ topologyDef.setName(includeTopologyDef.getName(), override);
++ }
++
++ // config
++ if(includeTopologyDef.getConfig() != null) {
++ //TODO move this logic to the model class
++ Map<String, Object> config = topologyDef.getConfig();
++ Map<String, Object> includeConfig = includeTopologyDef.getConfig();
++ if(override) {
++ config.putAll(includeTopologyDef.getConfig());
++ } else {
++ for(String key : includeConfig.keySet()){
++ if(config.containsKey(key)){
++ LOG.warn("Ignoring attempt to set topology config property '{}' with override == false", key);
++ }
++ else {
++ config.put(key, includeConfig.get(key));
++ }
++ }
++ }
++ }
++
++ //component overrides
++ if(includeTopologyDef.getComponents() != null){
++ topologyDef.addAllComponents(includeTopologyDef.getComponents(), override);
++ }
++ //bolt overrides
++ if(includeTopologyDef.getBolts() != null){
++ topologyDef.addAllBolts(includeTopologyDef.getBolts(), override);
++ }
++ //spout overrides
++ if(includeTopologyDef.getSpouts() != null) {
++ topologyDef.addAllSpouts(includeTopologyDef.getSpouts(), override);
++ }
++ //stream overrides
++ //TODO streams should be uniquely identifiable
++ if(includeTopologyDef.getStreams() != null) {
++ topologyDef.addAllStreams(includeTopologyDef.getStreams(), override);
++ }
++ } // end include processing
++ }
++ return topologyDef;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/main/resources/splash.txt
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/main/resources/splash.txt
index 0000000,0000000..337931a
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/main/resources/splash.txt
@@@ -1,0 -1,0 +1,9 @@@
++███████╗██╗ ██╗ ██╗██╗ ██╗
++██╔════╝██║ ██║ ██║╚██╗██╔╝
++█████╗ ██║ ██║ ██║ ╚███╔╝
++██╔══╝ ██║ ██║ ██║ ██╔██╗
++██║ ███████╗╚██████╔╝██╔╝ ██╗
++╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝
+++- Apache Storm -+
+++- data FLow User eXperience -+
++Version: ${project.version}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
index 0000000,0000000..ff67867
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
@@@ -1,0 -1,0 +1,31 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux;
++
++import org.junit.Test;
++import static org.junit.Assert.*;
++
++public class FluxBuilderTest {
++
++ @Test
++ public void testIsPrimitiveNumber() throws Exception {
++ assertTrue(FluxBuilder.isPrimitiveNumber(int.class));
++ assertFalse(FluxBuilder.isPrimitiveNumber(boolean.class));
++ assertFalse(FluxBuilder.isPrimitiveNumber(String.class));
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
index 0000000,0000000..5e17f5e
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
@@@ -1,0 -1,0 +1,41 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux;
++
++import org.junit.Test;
++
++public class IntegrationTest {
++
++ private static boolean skipTest = true;
++
++ static {
++ String skipStr = System.getProperty("skipIntegration");
++ if(skipStr != null && skipStr.equalsIgnoreCase("false")){
++ skipTest = false;
++ }
++ }
++
++
++
++ @Test
++ public void testRunTopologySource() throws Exception {
++ if(!skipTest) {
++ Flux.main(new String[]{"-s", "30000", "src/test/resources/configs/existing-topology.yaml"});
++ }
++ }
++}