You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:23 UTC
[20/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
deleted file mode 100644
index e79dfb7..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
+++ /dev/null
@@ -1,630 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux;
-
-import org.apache.storm.Config;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.grouping.CustomStreamGrouping;
-import org.apache.storm.topology.*;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.flux.model.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.*;
-import java.util.*;
-
-public class FluxBuilder {
- private static Logger LOG = LoggerFactory.getLogger(FluxBuilder.class);
-
- /**
- * Given a topology definition, return a populated `org.apache.storm.Config` instance.
- *
- * @param topologyDef
- * @return
- */
- public static Config buildConfig(TopologyDef topologyDef) {
- // merge contents of `config` into topology config
- Config conf = new Config();
- conf.putAll(topologyDef.getConfig());
- return conf;
- }
-
- /**
- * Given a topology definition, return a Storm topology that can be run either locally or remotely.
- *
- * @param context
- * @return
- * @throws IllegalAccessException
- * @throws InstantiationException
- * @throws ClassNotFoundException
- * @throws NoSuchMethodException
- * @throws InvocationTargetException
- */
- public static StormTopology buildTopology(ExecutionContext context) throws IllegalAccessException,
- InstantiationException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
-
- StormTopology topology = null;
- TopologyDef topologyDef = context.getTopologyDef();
-
- if(!topologyDef.validate()){
- throw new IllegalArgumentException("Invalid topology config. Spouts, bolts and streams cannot be " +
- "defined in the same configuration as a topologySource.");
- }
-
- // build components that may be referenced by spouts, bolts, etc.
- // the map will be a String --> Object where the object is a fully
- // constructed class instance
- buildComponents(context);
-
- if(topologyDef.isDslTopology()) {
- // This is a DSL (YAML, etc.) topology...
- LOG.info("Detected DSL topology...");
-
- TopologyBuilder builder = new TopologyBuilder();
-
- // create spouts
- buildSpouts(context, builder);
-
- // we need to be able to lookup bolts by id, then switch based
- // on whether they are IBasicBolt or IRichBolt instances
- buildBolts(context);
-
- // process stream definitions
- buildStreamDefinitions(context, builder);
-
- topology = builder.createTopology();
- } else {
- // user class supplied...
- // this also provides a bridge to Trident...
- LOG.info("A topology source has been specified...");
- ObjectDef def = topologyDef.getTopologySource();
- topology = buildExternalTopology(def, context);
- }
- return topology;
- }
-
- /**
- * Given a `java.lang.Object` instance and a method name, attempt to find a method that matches the input
- * parameter: `java.util.Map` or `org.apache.storm.Config`.
- *
- * @param topologySource object to inspect for the specified method
- * @param methodName name of the method to look for
- * @return
- * @throws NoSuchMethodException
- */
- private static Method findGetTopologyMethod(Object topologySource, String methodName) throws NoSuchMethodException {
- Class clazz = topologySource.getClass();
- Method[] methods = clazz.getMethods();
- ArrayList<Method> candidates = new ArrayList<Method>();
- for(Method method : methods){
- if(!method.getName().equals(methodName)){
- continue;
- }
- if(!method.getReturnType().equals(StormTopology.class)){
- continue;
- }
- Class[] paramTypes = method.getParameterTypes();
- if(paramTypes.length != 1){
- continue;
- }
- if(paramTypes[0].isAssignableFrom(Map.class) || paramTypes[0].isAssignableFrom(Config.class)){
- candidates.add(method);
- }
- }
-
- if(candidates.size() == 0){
- throw new IllegalArgumentException("Unable to find method '" + methodName + "' method in class: " + clazz.getName());
- } else if (candidates.size() > 1){
- LOG.warn("Found multiple candidate methods in class '" + clazz.getName() + "'. Using the first one found");
- }
-
- return candidates.get(0);
- }
-
- /**
- * @param context
- * @param builder
- */
- private static void buildStreamDefinitions(ExecutionContext context, TopologyBuilder builder)
- throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException,
- IllegalAccessException, NoSuchFieldException {
- TopologyDef topologyDef = context.getTopologyDef();
- // process stream definitions
- HashMap<String, BoltDeclarer> declarers = new HashMap<String, BoltDeclarer>();
- for (StreamDef stream : topologyDef.getStreams()) {
- Object boltObj = context.getBolt(stream.getTo());
- BoltDeclarer declarer = declarers.get(stream.getTo());
- if (boltObj instanceof IRichBolt) {
- if(declarer == null) {
- declarer = builder.setBolt(stream.getTo(),
- (IRichBolt) boltObj,
- topologyDef.parallelismForBolt(stream.getTo()));
- declarers.put(stream.getTo(), declarer);
- }
- } else if (boltObj instanceof IBasicBolt) {
- if(declarer == null) {
- declarer = builder.setBolt(
- stream.getTo(),
- (IBasicBolt) boltObj,
- topologyDef.parallelismForBolt(stream.getTo()));
- declarers.put(stream.getTo(), declarer);
- }
- } else if (boltObj instanceof IWindowedBolt) {
- if(declarer == null) {
- declarer = builder.setBolt(
- stream.getTo(),
- (IWindowedBolt) boltObj,
- topologyDef.parallelismForBolt(stream.getTo()));
- declarers.put(stream.getTo(), declarer);
- }
- } else if (boltObj instanceof IStatefulBolt) {
- if(declarer == null) {
- declarer = builder.setBolt(
- stream.getTo(),
- (IStatefulBolt) boltObj,
- topologyDef.parallelismForBolt(stream.getTo()));
- declarers.put(stream.getTo(), declarer);
- }
- } else {
- throw new IllegalArgumentException("Class does not appear to be a bolt: " +
- boltObj.getClass().getName());
- }
-
- GroupingDef grouping = stream.getGrouping();
- // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream
- String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId());
-
-
- switch (grouping.getType()) {
- case SHUFFLE:
- declarer.shuffleGrouping(stream.getFrom(), streamId);
- break;
- case FIELDS:
- //TODO check for null grouping args
- declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(grouping.getArgs()));
- break;
- case ALL:
- declarer.allGrouping(stream.getFrom(), streamId);
- break;
- case DIRECT:
- declarer.directGrouping(stream.getFrom(), streamId);
- break;
- case GLOBAL:
- declarer.globalGrouping(stream.getFrom(), streamId);
- break;
- case LOCAL_OR_SHUFFLE:
- declarer.localOrShuffleGrouping(stream.getFrom(), streamId);
- break;
- case NONE:
- declarer.noneGrouping(stream.getFrom(), streamId);
- break;
- case CUSTOM:
- declarer.customGrouping(stream.getFrom(), streamId,
- buildCustomStreamGrouping(stream.getGrouping().getCustomClass(), context));
- break;
- default:
- throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
- }
- }
- }
-
- private static void applyProperties(ObjectDef bean, Object instance, ExecutionContext context) throws
- IllegalAccessException, InvocationTargetException, NoSuchFieldException {
- List<PropertyDef> props = bean.getProperties();
- Class clazz = instance.getClass();
- if (props != null) {
- for (PropertyDef prop : props) {
- Object value = prop.isReference() ? context.getComponent(prop.getRef()) : prop.getValue();
- Method setter = findSetter(clazz, prop.getName(), value);
- if (setter != null) {
- LOG.debug("found setter, attempting to invoke");
- // invoke setter
- setter.invoke(instance, new Object[]{value});
- } else {
- // look for a public instance variable
- LOG.debug("no setter found. Looking for a public instance variable...");
- Field field = findPublicField(clazz, prop.getName(), value);
- if (field != null) {
- field.set(instance, value);
- }
- }
- }
- }
- }
-
- private static Field findPublicField(Class clazz, String property, Object arg) throws NoSuchFieldException {
- Field field = clazz.getField(property);
- return field;
- }
-
- private static Method findSetter(Class clazz, String property, Object arg) {
- String setterName = toSetterName(property);
- Method retval = null;
- Method[] methods = clazz.getMethods();
- for (Method method : methods) {
- if (setterName.equals(method.getName())) {
- LOG.debug("Found setter method: " + method.getName());
- retval = method;
- }
- }
- return retval;
- }
-
- private static String toSetterName(String name) {
- return "set" + name.substring(0, 1).toUpperCase() + name.substring(1, name.length());
- }
-
- private static List<Object> resolveReferences(List<Object> args, ExecutionContext context) {
- LOG.debug("Checking arguments for references.");
- List<Object> cArgs = new ArrayList<Object>();
- // resolve references
- for (Object arg : args) {
- if (arg instanceof BeanReference) {
- cArgs.add(context.getComponent(((BeanReference) arg).getId()));
- } else if (arg instanceof BeanListReference) {
- List<Object> components = new ArrayList<>();
- BeanListReference ref = (BeanListReference) arg;
- for (String id : ref.getIds()) {
- components.add(context.getComponent(id));
- }
-
- LOG.debug("BeanListReference resolved as {}", components);
- cArgs.add(components);
- } else {
- cArgs.add(arg);
- }
- }
- return cArgs;
- }
-
- private static Object buildObject(ObjectDef def, ExecutionContext context) throws ClassNotFoundException,
- IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
- Class clazz = Class.forName(def.getClassName());
- Object obj = null;
- if (def.hasConstructorArgs()) {
- LOG.debug("Found constructor arguments in definition: " + def.getConstructorArgs().getClass().getName());
- List<Object> cArgs = def.getConstructorArgs();
- if(def.hasReferences()){
- cArgs = resolveReferences(cArgs, context);
- }
- Constructor con = findCompatibleConstructor(cArgs, clazz);
- if (con != null) {
- LOG.debug("Found something seemingly compatible, attempting invocation...");
- obj = con.newInstance(getArgsWithListCoercian(cArgs, con.getParameterTypes()));
- } else {
- String msg = String.format("Couldn't find a suitable constructor for class '%s' with arguments '%s'.",
- clazz.getName(),
- cArgs);
- throw new IllegalArgumentException(msg);
- }
- } else {
- obj = clazz.newInstance();
- }
- applyProperties(def, obj, context);
- invokeConfigMethods(def, obj, context);
- return obj;
- }
-
- private static StormTopology buildExternalTopology(ObjectDef def, ExecutionContext context)
- throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException,
- InvocationTargetException, NoSuchFieldException {
-
- Object topologySource = buildObject(def, context);
-
- String methodName = context.getTopologyDef().getTopologySource().getMethodName();
- Method getTopology = findGetTopologyMethod(topologySource, methodName);
- if(getTopology.getParameterTypes()[0].equals(Config.class)){
- Config config = new Config();
- config.putAll(context.getTopologyDef().getConfig());
- return (StormTopology) getTopology.invoke(topologySource, config);
- } else {
- return (StormTopology) getTopology.invoke(topologySource, context.getTopologyDef().getConfig());
- }
- }
-
- private static CustomStreamGrouping buildCustomStreamGrouping(ObjectDef def, ExecutionContext context)
- throws ClassNotFoundException,
- IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
- Object grouping = buildObject(def, context);
- return (CustomStreamGrouping)grouping;
- }
-
- /**
- * Given a topology definition, resolve and instantiate all components found and return a map
- * keyed by the component id.
- */
- private static void buildComponents(ExecutionContext context) throws ClassNotFoundException, NoSuchMethodException,
- IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchFieldException {
- Collection<BeanDef> cDefs = context.getTopologyDef().getComponents();
- if (cDefs != null) {
- for (BeanDef bean : cDefs) {
- Object obj = buildObject(bean, context);
- context.addComponent(bean.getId(), obj);
- }
- }
- }
-
-
- private static void buildSpouts(ExecutionContext context, TopologyBuilder builder) throws ClassNotFoundException,
- NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchFieldException {
- for (SpoutDef sd : context.getTopologyDef().getSpouts()) {
- IRichSpout spout = buildSpout(sd, context);
- builder.setSpout(sd.getId(), spout, sd.getParallelism());
- context.addSpout(sd.getId(), spout);
- }
- }
-
- /**
- * Given a spout definition, return a Storm spout implementation by attempting to find a matching constructor
- * in the given spout class. Perform list to array conversion as necessary.
- */
- private static IRichSpout buildSpout(SpoutDef def, ExecutionContext context) throws ClassNotFoundException,
- IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
- return (IRichSpout)buildObject(def, context);
- }
-
- /**
- * Given a list of bolt definitions, build a map of Storm bolts with the bolt definition id as the key.
- * Attempt to coerce the given constructor arguments to a matching bolt constructor as much as possible.
- */
- private static void buildBolts(ExecutionContext context) throws ClassNotFoundException, IllegalAccessException,
- InstantiationException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
- for (BoltDef def : context.getTopologyDef().getBolts()) {
- Class clazz = Class.forName(def.getClassName());
- Object bolt = buildObject(def, context);
- context.addBolt(def.getId(), bolt);
- }
- }
-
- /**
- * Given a list of constructor arguments, and a target class, attempt to find a suitable constructor.
- *
- */
- private static Constructor findCompatibleConstructor(List<Object> args, Class target) throws NoSuchMethodException {
- Constructor retval = null;
- int eligibleCount = 0;
-
- LOG.debug("Target class: {}, constructor args: {}", target.getName(), args);
- Constructor[] cons = target.getDeclaredConstructors();
-
- for (Constructor con : cons) {
- Class[] paramClasses = con.getParameterTypes();
- if (paramClasses.length == args.size()) {
- LOG.debug("found constructor with same number of args..");
- boolean invokable = canInvokeWithArgs(args, con.getParameterTypes());
- if (invokable) {
- retval = con;
- eligibleCount++;
- }
- LOG.debug("** invokable --> {}", invokable);
- } else {
- LOG.debug("Skipping constructor with wrong number of arguments.");
- }
- }
- if (eligibleCount > 1) {
- LOG.warn("Found multiple invokable constructors for class {}, given arguments {}. Using the last one found.",
- target, args);
- }
- return retval;
- }
-
-
- public static void invokeConfigMethods(ObjectDef bean, Object instance, ExecutionContext context)
- throws InvocationTargetException, IllegalAccessException {
-
- List<ConfigMethodDef> methodDefs = bean.getConfigMethods();
- if(methodDefs == null || methodDefs.size() == 0){
- return;
- }
- Class clazz = instance.getClass();
- for(ConfigMethodDef methodDef : methodDefs){
- List<Object> args = methodDef.getArgs();
- if (args == null){
- args = new ArrayList();
- }
- if(methodDef.hasReferences()){
- args = resolveReferences(args, context);
- }
- String methodName = methodDef.getName();
- Method method = findCompatibleMethod(args, clazz, methodName);
- if(method != null) {
- Object[] methodArgs = getArgsWithListCoercian(args, method.getParameterTypes());
- method.invoke(instance, methodArgs);
- } else {
- String msg = String.format("Unable to find configuration method '%s' in class '%s' with arguments %s.",
- new Object[]{methodName, clazz.getName(), args});
- throw new IllegalArgumentException(msg);
- }
- }
- }
-
- private static Method findCompatibleMethod(List<Object> args, Class target, String methodName){
- Method retval = null;
- int eligibleCount = 0;
-
- LOG.debug("Target class: {}, methodName: {}, args: {}", target.getName(), methodName, args);
- Method[] methods = target.getMethods();
-
- for (Method method : methods) {
- Class[] paramClasses = method.getParameterTypes();
- if (paramClasses.length == args.size() && method.getName().equals(methodName)) {
- LOG.debug("found constructor with same number of args..");
- boolean invokable = false;
- if (args.size() == 0){
- // it's a method with zero args
- invokable = true;
- } else {
- invokable = canInvokeWithArgs(args, method.getParameterTypes());
- }
- if (invokable) {
- retval = method;
- eligibleCount++;
- }
- LOG.debug("** invokable --> {}", invokable);
- } else {
- LOG.debug("Skipping method with wrong number of arguments.");
- }
- }
- if (eligibleCount > 1) {
- LOG.warn("Found multiple invokable methods for class {}, method {}, given arguments {}. " +
- "Using the last one found.",
- new Object[]{target, methodName, args});
- }
- return retval;
- }
-
- /**
- * Given a java.util.List of contructor/method arguments, and a list of parameter types, attempt to convert the
- * list to an java.lang.Object array that can be used to invoke the constructor. If an argument needs
- * to be coerced from a List to an Array, do so.
- */
- private static Object[] getArgsWithListCoercian(List<Object> args, Class[] parameterTypes) {
-// Class[] parameterTypes = constructor.getParameterTypes();
- if (parameterTypes.length != args.size()) {
- throw new IllegalArgumentException("Contructor parameter count does not egual argument size.");
- }
- Object[] constructorParams = new Object[args.size()];
-
- // loop through the arguments, if we hit a list that has to be convered to an array,
- // perform the conversion
- for (int i = 0; i < args.size(); i++) {
- Object obj = args.get(i);
- Class paramType = parameterTypes[i];
- Class objectType = obj.getClass();
- LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.",
- paramType, objectType);
- if (paramType.equals(objectType)) {
- LOG.debug("They are the same class.");
- constructorParams[i] = args.get(i);
- continue;
- }
- if (paramType.isAssignableFrom(objectType)) {
- LOG.debug("Assignment is possible.");
- constructorParams[i] = args.get(i);
- continue;
- }
- if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)){
- LOG.debug("Its a primitive boolean.");
- Boolean bool = (Boolean)args.get(i);
- constructorParams[i] = bool.booleanValue();
- continue;
- }
- if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){
- LOG.debug("Its a primitive number.");
- Number num = (Number)args.get(i);
- if(paramType == Float.TYPE){
- constructorParams[i] = num.floatValue();
- } else if (paramType == Double.TYPE) {
- constructorParams[i] = num.doubleValue();
- } else if (paramType == Long.TYPE) {
- constructorParams[i] = num.longValue();
- } else if (paramType == Integer.TYPE) {
- constructorParams[i] = num.intValue();
- } else if (paramType == Short.TYPE) {
- constructorParams[i] = num.shortValue();
- } else if (paramType == Byte.TYPE) {
- constructorParams[i] = num.byteValue();
- } else {
- constructorParams[i] = args.get(i);
- }
- continue;
- }
-
- // enum conversion
- if(paramType.isEnum() && objectType.equals(String.class)){
- LOG.debug("Yes, will convert a String to enum");
- constructorParams[i] = Enum.valueOf(paramType, (String)args.get(i));
- continue;
- }
-
- // List to array conversion
- if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
- // TODO more collection content type checking
- LOG.debug("Conversion appears possible...");
- List list = (List) obj;
- LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), list.get(0).getClass());
-
- // create an array of the right type
- Object newArrayObj = Array.newInstance(paramType.getComponentType(), list.size());
- for (int j = 0; j < list.size(); j++) {
- Array.set(newArrayObj, j, list.get(j));
-
- }
- constructorParams[i] = newArrayObj;
- LOG.debug("After conversion: {}", constructorParams[i]);
- }
- }
- return constructorParams;
- }
-
-
- /**
- * Determine if the given constructor/method parameter types are compatible given arguments List. Consider if
- * list coercian can make it possible.
- *
- * @param args
- * @param parameterTypes
- * @return
- */
- private static boolean canInvokeWithArgs(List<Object> args, Class[] parameterTypes) {
- if (parameterTypes.length != args.size()) {
- LOG.warn("parameter types were the wrong size");
- return false;
- }
-
- for (int i = 0; i < args.size(); i++) {
- Object obj = args.get(i);
- if (obj == null) {
- throw new IllegalArgumentException("argument shouldn't be null - index: " + i);
- }
- Class paramType = parameterTypes[i];
- Class objectType = obj.getClass();
- LOG.debug("Comparing parameter class {} to object class {} to see if assignment is possible.",
- paramType, objectType);
- if (paramType.equals(objectType)) {
- LOG.debug("Yes, they are the same class.");
- } else if (paramType.isAssignableFrom(objectType)) {
- LOG.debug("Yes, assignment is possible.");
- } else if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)){
- LOG.debug("Yes, assignment is possible.");
- } else if(isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)){
- LOG.debug("Yes, assignment is possible.");
- } else if(paramType.isEnum() && objectType.equals(String.class)){
- LOG.debug("Yes, will convert a String to enum");
- } else if (paramType.isArray() && List.class.isAssignableFrom(objectType)) {
- // TODO more collection content type checking
- LOG.debug("Assignment is possible if we convert a List to an array.");
- LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), ((List) obj).get(0).getClass());
- } else {
- return false;
- }
- }
- return true;
- }
-
- public static boolean isPrimitiveNumber(Class clazz){
- return clazz.isPrimitive() && !clazz.equals(boolean.class);
- }
-
- public static boolean isPrimitiveBoolean(Class clazz){
- return clazz.isPrimitive() && clazz.equals(boolean.class);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
deleted file mode 100644
index 2777854..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.api;
-
-
-import org.apache.storm.generated.StormTopology;
-
-import java.util.Map;
-
-/**
- * Marker interface for objects that can produce `StormTopology` objects.
- *
- * If a `topology-source` class implements the `getTopology()` method, Flux will
- * call that method. Otherwise, it will introspect the given class and look for a
- * similar method that produces a `StormTopology` instance.
- *
- * Note that it is not strictly necessary for a class to implement this interface.
- * If a class defines a method with a similar signature, Flux should be able to find
- * and invoke it.
- *
- */
-public interface TopologySource {
- public StormTopology getTopology(Map<String, Object> config);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
deleted file mode 100644
index 72ca5ae..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanDef.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A representation of a Java object that is uniquely identifyable, and given a className, constructor arguments,
- * and properties, can be instantiated.
- */
-public class BeanDef extends ObjectDef {
- private String id;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java
deleted file mode 100644
index 652210c..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanListReference.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import java.util.List;
-
-/**
- * A bean list reference is a list of bean reference.
- */
-public class BeanListReference {
- public List<String> ids;
-
- public BeanListReference(){}
-
- public BeanListReference(List<String> ids){
- this.ids = ids;
- }
-
- public List<String> getIds() {
- return ids;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
deleted file mode 100644
index bd236f1..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BeanReference.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-/**
- * A bean reference is simply a string pointer to another id.
- */
-public class BeanReference {
- public String id;
-
- public BeanReference(){}
-
- public BeanReference(String id){
- this.id = id;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
deleted file mode 100644
index 362abf1..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/BoltDef.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-/**
- * Bean representation of a Storm bolt.
- */
-public class BoltDef extends VertexDef {
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
deleted file mode 100644
index 69cabc3..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ConfigMethodDef {
- private String name;
- private List<Object> args;
- private boolean hasReferences = false;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public List<Object> getArgs() {
- return args;
- }
-
- public void setArgs(List<Object> args) {
-
- List<Object> newVal = new ArrayList<Object>();
- for(Object obj : args){
- if(obj instanceof LinkedHashMap){
- Map map = (Map)obj;
- if(map.containsKey("ref") && map.size() == 1){
- newVal.add(new BeanReference((String)map.get("ref")));
- this.hasReferences = true;
- } else if (map.containsKey("reflist") && map.size() == 1) {
- newVal.add(new BeanListReference((List<String>) map.get("reflist")));
- this.hasReferences = true;
- } else {
- newVal.add(obj);
- }
- } else {
- newVal.add(obj);
- }
- }
- this.args = newVal;
- }
-
- public boolean hasReferences(){
- return this.hasReferences;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
deleted file mode 100644
index 1520006..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import org.apache.storm.Config;
-import org.apache.storm.task.IBolt;
-import org.apache.storm.topology.IRichSpout;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Container for all the objects required to instantiate a topology.
- */
-public class ExecutionContext {
- // parsed Topology definition
- TopologyDef topologyDef;
-
- // Storm config
- private Config config;
-
- // components
- private List<Object> compontents;
- // indexed by id
- private Map<String, Object> componentMap = new HashMap<String, Object>();
-
- private Map<String, IRichSpout> spoutMap = new HashMap<String, IRichSpout>();
-
- private List<IBolt> bolts;
- private Map<String, Object> boltMap = new HashMap<String, Object>();
-
- public ExecutionContext(TopologyDef topologyDef, Config config){
- this.topologyDef = topologyDef;
- this.config = config;
- }
-
- public TopologyDef getTopologyDef(){
- return this.topologyDef;
- }
-
- public void addSpout(String id, IRichSpout spout){
- this.spoutMap.put(id, spout);
- }
-
- public void addBolt(String id, Object bolt){
- this.boltMap.put(id, bolt);
- }
-
- public Object getBolt(String id){
- return this.boltMap.get(id);
- }
-
- public void addComponent(String id, Object value){
- this.componentMap.put(id, value);
- }
-
- public Object getComponent(String id){
- return this.componentMap.get(id);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
deleted file mode 100644
index e4fac8e..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import java.util.List;
-
-/**
- * Bean representation of a Storm stream grouping.
- */
-public class GroupingDef {
-
- /**
- * Types of stream groupings Storm allows
- */
- public static enum Type {
- ALL,
- CUSTOM,
- DIRECT,
- SHUFFLE,
- LOCAL_OR_SHUFFLE,
- FIELDS,
- GLOBAL,
- NONE
- }
-
- private Type type;
- private String streamId;
- private List<String> args;
- private ObjectDef customClass;
-
- public List<String> getArgs() {
- return args;
- }
-
- public void setArgs(List<String> args) {
- this.args = args;
- }
-
- public Type getType() {
- return type;
- }
-
- public void setType(Type type) {
- this.type = type;
- }
-
- public String getStreamId() {
- return streamId;
- }
-
- public void setStreamId(String streamId) {
- this.streamId = streamId;
- }
-
- public ObjectDef getCustomClass() {
- return customClass;
- }
-
- public void setCustomClass(ObjectDef customClass) {
- this.customClass = customClass;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
deleted file mode 100644
index 23fd9d2..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-/**
- * Represents an include. Includes can be either a file or a classpath resource.
- *
- * If an include is marked as `override=true` then existing properties will be replaced.
- *
- */
-public class IncludeDef {
- private boolean resource = false;
- boolean override = false;
- private String file;
-
- public boolean isResource() {
- return resource;
- }
-
- public void setResource(boolean resource) {
- this.resource = resource;
- }
-
- public String getFile() {
- return file;
- }
-
- public void setFile(String file) {
- this.file = file;
- }
-
- public boolean isOverride() {
- return override;
- }
-
- public void setOverride(boolean override) {
- this.override = override;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
deleted file mode 100644
index 04a7e8a..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import org.apache.storm.Config;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A representation of a Java object that given a className, constructor arguments,
- * and properties, can be instantiated.
- */
-public class ObjectDef {
- private String className;
- private List<Object> constructorArgs;
- private boolean hasReferences;
- private List<PropertyDef> properties;
- private List<ConfigMethodDef> configMethods;
-
- public String getClassName() {
- return className;
- }
-
- public void setClassName(String className) {
- this.className = className;
- }
-
- public List<Object> getConstructorArgs() {
- return constructorArgs;
- }
-
- public void setConstructorArgs(List<Object> constructorArgs) {
-
- List<Object> newVal = new ArrayList<Object>();
- for(Object obj : constructorArgs){
- if(obj instanceof LinkedHashMap){
- Map map = (Map)obj;
- if(map.containsKey("ref") && map.size() == 1) {
- newVal.add(new BeanReference((String) map.get("ref")));
- this.hasReferences = true;
- } else if (map.containsKey("reflist") && map.size() == 1) {
- newVal.add(new BeanListReference((List<String>) map.get("reflist")));
- this.hasReferences = true;
- } else {
- newVal.add(obj);
- }
- } else {
- newVal.add(obj);
- }
- }
- this.constructorArgs = newVal;
- }
-
- public boolean hasConstructorArgs(){
- return this.constructorArgs != null && this.constructorArgs.size() > 0;
- }
-
- public boolean hasReferences(){
- return this.hasReferences;
- }
-
- public List<PropertyDef> getProperties() {
- return properties;
- }
-
- public void setProperties(List<PropertyDef> properties) {
- this.properties = properties;
- }
-
- public List<ConfigMethodDef> getConfigMethods() {
- return configMethods;
- }
-
- public void setConfigMethods(List<ConfigMethodDef> configMethods) {
- this.configMethods = configMethods;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
deleted file mode 100644
index f3d7704..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-public class PropertyDef {
- private String name;
- private Object value;
- private String ref;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public Object getValue() {
- return value;
- }
-
- public void setValue(Object value) {
- if(this.ref != null){
- throw new IllegalStateException("A property can only have a value OR a reference, not both.");
- }
- this.value = value;
- }
-
- public String getRef() {
- return ref;
- }
-
- public void setRef(String ref) {
- if(this.value != null){
- throw new IllegalStateException("A property can only have a value OR a reference, not both.");
- }
- this.ref = ref;
- }
-
- public boolean isReference(){
- return this.ref != null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
deleted file mode 100644
index 277c601..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-/**
- * Bean representation of a Storm spout.
- */
-public class SpoutDef extends VertexDef {
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
deleted file mode 100644
index da80f1c..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-/**
- * Represents a stream of tuples from one Storm component (Spout or Bolt) to another (an edge in the topology DAG).
- *
- * Required fields are `from` and `to`, which define the source and destination, and the stream `grouping`.
- *
- */
-public class StreamDef {
-
- private String name; // not used, placeholder for GUI, etc.
- private String from;
- private String to;
- private GroupingDef grouping;
-
- public String getTo() {
- return to;
- }
-
- public void setTo(String to) {
- this.to = to;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getFrom() {
- return from;
- }
-
- public void setFrom(String from) {
- this.from = from;
- }
-
- public GroupingDef getGrouping() {
- return grouping;
- }
-
- public void setGrouping(GroupingDef grouping) {
- this.grouping = grouping;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
deleted file mode 100644
index 86614f1..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * Bean represenation of a topology.
- *
- * It consists of the following:
- * 1. The topology name
- * 2. A `java.util.Map` representing the `org.apache.storm.config` for the topology
- * 3. A list of spout definitions
- * 4. A list of bolt definitions
- * 5. A list of stream definitions that define the flow between spouts and bolts.
- *
- */
-public class TopologyDef {
- private static Logger LOG = LoggerFactory.getLogger(TopologyDef.class);
-
- private String name;
- private Map<String, BeanDef> componentMap = new LinkedHashMap<String, BeanDef>(); // not required
- private List<IncludeDef> includes; // not required
- private Map<String, Object> config = new HashMap<String, Object>();
-
- // a "topology source" is a class that can produce a `StormTopology` thrift object.
- private TopologySourceDef topologySource;
-
- // the following are required if we're defining a core storm topology DAG in YAML, etc.
- private Map<String, BoltDef> boltMap = new LinkedHashMap<String, BoltDef>();
- private Map<String, SpoutDef> spoutMap = new LinkedHashMap<String, SpoutDef>();
- private List<StreamDef> streams = new ArrayList<StreamDef>();
-
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public void setName(String name, boolean override){
- if(this.name == null || override){
- this.name = name;
- } else {
- LOG.warn("Ignoring attempt to set property 'name' with override == false.");
- }
- }
-
- public List<SpoutDef> getSpouts() {
- ArrayList<SpoutDef> retval = new ArrayList<SpoutDef>();
- retval.addAll(this.spoutMap.values());
- return retval;
- }
-
- public void setSpouts(List<SpoutDef> spouts) {
- this.spoutMap = new LinkedHashMap<String, SpoutDef>();
- for(SpoutDef spout : spouts){
- this.spoutMap.put(spout.getId(), spout);
- }
- }
-
- public List<BoltDef> getBolts() {
- ArrayList<BoltDef> retval = new ArrayList<BoltDef>();
- retval.addAll(this.boltMap.values());
- return retval;
- }
-
- public void setBolts(List<BoltDef> bolts) {
- this.boltMap = new LinkedHashMap<String, BoltDef>();
- for(BoltDef bolt : bolts){
- this.boltMap.put(bolt.getId(), bolt);
- }
- }
-
- public List<StreamDef> getStreams() {
- return streams;
- }
-
- public void setStreams(List<StreamDef> streams) {
- this.streams = streams;
- }
-
- public Map<String, Object> getConfig() {
- return config;
- }
-
- public void setConfig(Map<String, Object> config) {
- this.config = config;
- }
-
- public List<BeanDef> getComponents() {
- ArrayList<BeanDef> retval = new ArrayList<BeanDef>();
- retval.addAll(this.componentMap.values());
- return retval;
- }
-
- public void setComponents(List<BeanDef> components) {
- this.componentMap = new LinkedHashMap<String, BeanDef>();
- for(BeanDef component : components){
- this.componentMap.put(component.getId(), component);
- }
- }
-
- public List<IncludeDef> getIncludes() {
- return includes;
- }
-
- public void setIncludes(List<IncludeDef> includes) {
- this.includes = includes;
- }
-
- // utility methods
- public int parallelismForBolt(String boltId){
- return this.boltMap.get(boltId).getParallelism();
- }
-
- public BoltDef getBoltDef(String id){
- return this.boltMap.get(id);
- }
-
- public SpoutDef getSpoutDef(String id){
- return this.spoutMap.get(id);
- }
-
- public BeanDef getComponent(String id){
- return this.componentMap.get(id);
- }
-
- // used by includes implementation
- public void addAllBolts(List<BoltDef> bolts, boolean override){
- for(BoltDef bolt : bolts){
- String id = bolt.getId();
- if(this.boltMap.get(id) == null || override) {
- this.boltMap.put(bolt.getId(), bolt);
- } else {
- LOG.warn("Ignoring attempt to create bolt '{}' with override == false.", id);
- }
- }
- }
-
- public void addAllSpouts(List<SpoutDef> spouts, boolean override){
- for(SpoutDef spout : spouts){
- String id = spout.getId();
- if(this.spoutMap.get(id) == null || override) {
- this.spoutMap.put(spout.getId(), spout);
- } else {
- LOG.warn("Ignoring attempt to create spout '{}' with override == false.", id);
- }
- }
- }
-
- public void addAllComponents(List<BeanDef> components, boolean override) {
- for(BeanDef bean : components){
- String id = bean.getId();
- if(this.componentMap.get(id) == null || override) {
- this.componentMap.put(bean.getId(), bean);
- } else {
- LOG.warn("Ignoring attempt to create component '{}' with override == false.", id);
- }
- }
- }
-
- public void addAllStreams(List<StreamDef> streams, boolean override) {
- //TODO figure out how we want to deal with overrides. Users may want to add streams even when overriding other
- // properties. For now we just add them blindly which could lead to a potentially invalid topology.
- this.streams.addAll(streams);
- }
-
- public TopologySourceDef getTopologySource() {
- return topologySource;
- }
-
- public void setTopologySource(TopologySourceDef topologySource) {
- this.topologySource = topologySource;
- }
-
- public boolean isDslTopology(){
- return this.topologySource == null;
- }
-
-
- public boolean validate(){
- boolean hasSpouts = this.spoutMap != null && this.spoutMap.size() > 0;
- boolean hasBolts = this.boltMap != null && this.boltMap.size() > 0;
- boolean hasStreams = this.streams != null && this.streams.size() > 0;
- boolean hasSpoutsBoltsStreams = hasStreams && hasBolts && hasSpouts;
- // you cant define a topologySource and a DSL topology at the same time...
- if (!isDslTopology() && ((hasSpouts || hasBolts || hasStreams))) {
- return false;
- }
- if(isDslTopology() && (hasSpouts && hasBolts && hasStreams)) {
- return true;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
deleted file mode 100644
index d6a2f57..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-public class TopologySourceDef extends ObjectDef {
- public static final String DEFAULT_METHOD_NAME = "getTopology";
-
- private String methodName;
-
- public TopologySourceDef(){
- this.methodName = DEFAULT_METHOD_NAME;
- }
-
- public String getMethodName() {
- return methodName;
- }
-
- public void setMethodName(String methodName) {
- this.methodName = methodName;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
deleted file mode 100644
index e71bcc2..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.model;
-
-/**
- * Abstract parent class of component definitions
- * (spouts/bolts)
- */
-public abstract class VertexDef extends BeanDef {
-
- // default parallelism to 1 so if it's ommitted, the topology will still function.
- private int parallelism = 1;
-
- public int getParallelism() {
- return parallelism;
- }
-
- public void setParallelism(int parallelism) {
- this.parallelism = parallelism;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
deleted file mode 100644
index 2a18474..0000000
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux.parser;
-
-import org.apache.storm.flux.api.TopologySource;
-import org.apache.storm.flux.model.BoltDef;
-import org.apache.storm.flux.model.IncludeDef;
-import org.apache.storm.flux.model.SpoutDef;
-import org.apache.storm.flux.model.TopologyDef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.TypeDescription;
-import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.constructor.Constructor;
-
-import java.io.ByteArrayOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Properties;
-
-public class FluxParser {
- private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class);
-
- private FluxParser(){}
-
- // TODO refactor input stream processing (see parseResource() method).
- public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes,
- String propertiesFile, boolean envSub) throws IOException {
-
- FileInputStream in = new FileInputStream(inputFile);
- TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, propertiesFile, envSub);
- in.close();
-
- return topology;
- }
-
- public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes,
- String propertiesFile, boolean envSub) throws IOException {
-
- InputStream in = FluxParser.class.getResourceAsStream(resource);
- TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, propertiesFile, envSub);
- in.close();
-
- return topology;
- }
-
- public static TopologyDef parseInputStream(InputStream inputStream, boolean dumpYaml, boolean processIncludes,
- String propertiesFile, boolean envSub) throws IOException {
-
- Yaml yaml = yaml();
-
- if (inputStream == null) {
- LOG.error("Unable to load input stream");
- System.exit(1);
- }
-
- TopologyDef topology = loadYaml(yaml, inputStream, propertiesFile, envSub);
-
- if (dumpYaml) {
- dumpYaml(topology, yaml);
- }
-
- if (processIncludes) {
- return processIncludes(yaml, topology, propertiesFile, envSub);
- } else {
- return topology;
- }
- }
-
- private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile, boolean envSubstitution) throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- LOG.info("loading YAML from input stream...");
- int b = -1;
- while((b = in.read()) != -1){
- bos.write(b);
- }
-
- // TODO substitution implementation is not exactly efficient or kind to memory...
- String str = bos.toString();
- // properties file substitution
- if(propsFile != null){
- LOG.info("Performing property substitution.");
- InputStream propsIn = new FileInputStream(propsFile);
- Properties props = new Properties();
- props.load(propsIn);
- for(Object key : props.keySet()){
- str = str.replace("${" + key + "}", props.getProperty((String)key));
- }
- } else {
- LOG.info("Not performing property substitution.");
- }
-
- // environment variable substitution
- if(envSubstitution){
- LOG.info("Performing environment variable substitution...");
- Map<String, String> envs = System.getenv();
- for(String key : envs.keySet()){
- str = str.replace("${ENV-" + key + "}", envs.get(key));
- }
- } else {
- LOG.info("Not performing environment variable substitution.");
- }
- return (TopologyDef)yaml.load(str);
- }
-
- private static void dumpYaml(TopologyDef topology, Yaml yaml){
- System.out.println("Configuration (interpreted): \n" + yaml.dump(topology));
- }
-
- private static Yaml yaml(){
- Constructor constructor = new Constructor(TopologyDef.class);
-
- TypeDescription topologyDescription = new TypeDescription(TopologyDef.class);
- topologyDescription.putListPropertyType("spouts", SpoutDef.class);
- topologyDescription.putListPropertyType("bolts", BoltDef.class);
- topologyDescription.putListPropertyType("includes", IncludeDef.class);
- constructor.addTypeDescription(topologyDescription);
-
- Yaml yaml = new Yaml(constructor);
- return yaml;
- }
-
- /**
- *
- * @param yaml the yaml parser for parsing the include file(s)
- * @param topologyDef the topology definition containing (possibly zero) includes
- * @return The TopologyDef with includes resolved.
- */
- private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile, boolean envSub)
- throws IOException {
- //TODO support multiple levels of includes
- if(topologyDef.getIncludes() != null) {
- for (IncludeDef include : topologyDef.getIncludes()){
- TopologyDef includeTopologyDef = null;
- if (include.isResource()) {
- LOG.info("Loading includes from resource: {}", include.getFile());
- includeTopologyDef = parseResource(include.getFile(), true, false, propsFile, envSub);
- } else {
- LOG.info("Loading includes from file: {}", include.getFile());
- includeTopologyDef = parseFile(include.getFile(), true, false, propsFile, envSub);
- }
-
- // if overrides are disabled, we won't replace anything that already exists
- boolean override = include.isOverride();
- // name
- if(includeTopologyDef.getName() != null){
- topologyDef.setName(includeTopologyDef.getName(), override);
- }
-
- // config
- if(includeTopologyDef.getConfig() != null) {
- //TODO move this logic to the model class
- Map<String, Object> config = topologyDef.getConfig();
- Map<String, Object> includeConfig = includeTopologyDef.getConfig();
- if(override) {
- config.putAll(includeTopologyDef.getConfig());
- } else {
- for(String key : includeConfig.keySet()){
- if(config.containsKey(key)){
- LOG.warn("Ignoring attempt to set topology config property '{}' with override == false", key);
- }
- else {
- config.put(key, includeConfig.get(key));
- }
- }
- }
- }
-
- //component overrides
- if(includeTopologyDef.getComponents() != null){
- topologyDef.addAllComponents(includeTopologyDef.getComponents(), override);
- }
- //bolt overrides
- if(includeTopologyDef.getBolts() != null){
- topologyDef.addAllBolts(includeTopologyDef.getBolts(), override);
- }
- //spout overrides
- if(includeTopologyDef.getSpouts() != null) {
- topologyDef.addAllSpouts(includeTopologyDef.getSpouts(), override);
- }
- //stream overrides
- //TODO streams should be uniquely identifiable
- if(includeTopologyDef.getStreams() != null) {
- topologyDef.addAllStreams(includeTopologyDef.getStreams(), override);
- }
- } // end include processing
- }
- return topologyDef;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/main/resources/splash.txt
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/resources/splash.txt b/external/flux/flux-core/src/main/resources/splash.txt
deleted file mode 100644
index 337931a..0000000
--- a/external/flux/flux-core/src/main/resources/splash.txt
+++ /dev/null
@@ -1,9 +0,0 @@
-\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2557 \u2588\u2588\u2557 \u2588\u2588\u2557\u2588\u2588\u2557 \u2588\u2588\u2557
-\u2588\u2588\u2554\u2550\u2550\u2550\u2550\u255d\u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551\u255a\u2588\u2588\u2557\u2588\u2588\u2554\u255d
-\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551 \u255a\u2588\u2588\u2588\u2554\u255d
-\u2588\u2588\u2554\u2550\u2550\u255d \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2554\u2588\u2588\u2557
-\u2588\u2588\u2551 \u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u255a\u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d\u2588\u2588\u2554\u255d \u2588\u2588\u2557
-\u255a\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u255d
-+- Apache Storm -+
-+- data FLow User eXperience -+
-Version: ${project.version}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java b/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
deleted file mode 100644
index ff67867..0000000
--- a/external/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class FluxBuilderTest {
-
- @Test
- public void testIsPrimitiveNumber() throws Exception {
- assertTrue(FluxBuilder.isPrimitiveNumber(int.class));
- assertFalse(FluxBuilder.isPrimitiveNumber(boolean.class));
- assertFalse(FluxBuilder.isPrimitiveNumber(String.class));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java b/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
deleted file mode 100644
index c5807f8..0000000
--- a/external/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.flux;
-
-import org.junit.Test;
-
-public class IntegrationTest {
-
- private static boolean skipTest = true;
-
- static {
- String skipStr = System.getProperty("skipIntegration");
- if(skipStr != null && skipStr.equalsIgnoreCase("false")){
- skipTest = false;
- }
- }
-
- @Test
- public void testRunTopologySource() throws Exception {
- if(!skipTest) {
- Flux.main(new String[]{"-s", "30000", "src/test/resources/configs/existing-topology.yaml"});
- }
- }
-}