You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 23:19:12 UTC
[09/23] storm git commit: STORM-2453 Move non-connectors into the top
directory
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
new file mode 100644
index 0000000..1520006
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.topology.IRichSpout;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Container for all the objects required to instantiate a topology.
+ */
+public class ExecutionContext {
+ // parsed Topology definition
+ TopologyDef topologyDef;
+
+ // Storm config
+ private Config config;
+
+ // components
+ private List<Object> compontents;
+ // indexed by id
+ private Map<String, Object> componentMap = new HashMap<String, Object>();
+
+ private Map<String, IRichSpout> spoutMap = new HashMap<String, IRichSpout>();
+
+ private List<IBolt> bolts;
+ private Map<String, Object> boltMap = new HashMap<String, Object>();
+
+ public ExecutionContext(TopologyDef topologyDef, Config config){
+ this.topologyDef = topologyDef;
+ this.config = config;
+ }
+
+ public TopologyDef getTopologyDef(){
+ return this.topologyDef;
+ }
+
+ public void addSpout(String id, IRichSpout spout){
+ this.spoutMap.put(id, spout);
+ }
+
+ public void addBolt(String id, Object bolt){
+ this.boltMap.put(id, bolt);
+ }
+
+ public Object getBolt(String id){
+ return this.boltMap.get(id);
+ }
+
+ public void addComponent(String id, Object value){
+ this.componentMap.put(id, value);
+ }
+
+ public Object getComponent(String id){
+ return this.componentMap.get(id);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
new file mode 100644
index 0000000..e4fac8e
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/GroupingDef.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+import java.util.List;
+
+/**
+ * Bean representation of a Storm stream grouping.
+ */
+public class GroupingDef {
+
+ /**
+ * Types of stream groupings Storm allows
+ */
+ public static enum Type {
+ ALL,
+ CUSTOM,
+ DIRECT,
+ SHUFFLE,
+ LOCAL_OR_SHUFFLE,
+ FIELDS,
+ GLOBAL,
+ NONE
+ }
+
+ private Type type;
+ private String streamId;
+ private List<String> args;
+ private ObjectDef customClass;
+
+ public List<String> getArgs() {
+ return args;
+ }
+
+ public void setArgs(List<String> args) {
+ this.args = args;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public void setType(Type type) {
+ this.type = type;
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public void setStreamId(String streamId) {
+ this.streamId = streamId;
+ }
+
+ public ObjectDef getCustomClass() {
+ return customClass;
+ }
+
+ public void setCustomClass(ObjectDef customClass) {
+ this.customClass = customClass;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
new file mode 100644
index 0000000..23fd9d2
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/IncludeDef.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * Represents an include. Includes can be either a file or a classpath resource.
+ *
+ * If an include is marked as `override=true` then existing properties will be replaced.
+ *
+ */
+public class IncludeDef {
+ private boolean resource = false;
+ boolean override = false;
+ private String file;
+
+ public boolean isResource() {
+ return resource;
+ }
+
+ public void setResource(boolean resource) {
+ this.resource = resource;
+ }
+
+ public String getFile() {
+ return file;
+ }
+
+ public void setFile(String file) {
+ this.file = file;
+ }
+
+ public boolean isOverride() {
+ return override;
+ }
+
+ public void setOverride(boolean override) {
+ this.override = override;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
new file mode 100644
index 0000000..04a7e8a
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+import org.apache.storm.Config;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A representation of a Java object that given a className, constructor arguments,
+ * and properties, can be instantiated.
+ */
+public class ObjectDef {
+ private String className;
+ private List<Object> constructorArgs;
+ private boolean hasReferences;
+ private List<PropertyDef> properties;
+ private List<ConfigMethodDef> configMethods;
+
+ public String getClassName() {
+ return className;
+ }
+
+ public void setClassName(String className) {
+ this.className = className;
+ }
+
+ public List<Object> getConstructorArgs() {
+ return constructorArgs;
+ }
+
+ public void setConstructorArgs(List<Object> constructorArgs) {
+
+ List<Object> newVal = new ArrayList<Object>();
+ for(Object obj : constructorArgs){
+ if(obj instanceof LinkedHashMap){
+ Map map = (Map)obj;
+ if(map.containsKey("ref") && map.size() == 1) {
+ newVal.add(new BeanReference((String) map.get("ref")));
+ this.hasReferences = true;
+ } else if (map.containsKey("reflist") && map.size() == 1) {
+ newVal.add(new BeanListReference((List<String>) map.get("reflist")));
+ this.hasReferences = true;
+ } else {
+ newVal.add(obj);
+ }
+ } else {
+ newVal.add(obj);
+ }
+ }
+ this.constructorArgs = newVal;
+ }
+
+ public boolean hasConstructorArgs(){
+ return this.constructorArgs != null && this.constructorArgs.size() > 0;
+ }
+
+ public boolean hasReferences(){
+ return this.hasReferences;
+ }
+
+ public List<PropertyDef> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(List<PropertyDef> properties) {
+ this.properties = properties;
+ }
+
+ public List<ConfigMethodDef> getConfigMethods() {
+ return configMethods;
+ }
+
+ public void setConfigMethods(List<ConfigMethodDef> configMethods) {
+ this.configMethods = configMethods;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
new file mode 100644
index 0000000..f3d7704
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/PropertyDef.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+public class PropertyDef {
+ private String name;
+ private Object value;
+ private String ref;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public void setValue(Object value) {
+ if(this.ref != null){
+ throw new IllegalStateException("A property can only have a value OR a reference, not both.");
+ }
+ this.value = value;
+ }
+
+ public String getRef() {
+ return ref;
+ }
+
+ public void setRef(String ref) {
+ if(this.value != null){
+ throw new IllegalStateException("A property can only have a value OR a reference, not both.");
+ }
+ this.ref = ref;
+ }
+
+ public boolean isReference(){
+ return this.ref != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
new file mode 100644
index 0000000..277c601
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/SpoutDef.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * Bean representation of a Storm spout.
+ */
+public class SpoutDef extends VertexDef {
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
new file mode 100644
index 0000000..da80f1c
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/StreamDef.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * Represents a stream of tuples from one Storm component (Spout or Bolt) to another (an edge in the topology DAG).
+ *
+ * Required fields are `from` and `to`, which define the source and destination, and the stream `grouping`.
+ *
+ */
+public class StreamDef {
+
+ private String name; // not used, placeholder for GUI, etc.
+ private String from;
+ private String to;
+ private GroupingDef grouping;
+
+ public String getTo() {
+ return to;
+ }
+
+ public void setTo(String to) {
+ this.to = to;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getFrom() {
+ return from;
+ }
+
+ public void setFrom(String from) {
+ this.from = from;
+ }
+
+ public GroupingDef getGrouping() {
+ return grouping;
+ }
+
+ public void setGrouping(GroupingDef grouping) {
+ this.grouping = grouping;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
new file mode 100644
index 0000000..86614f1
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Bean represenation of a topology.
+ *
+ * It consists of the following:
+ * 1. The topology name
+ * 2. A `java.util.Map` representing the `org.apache.storm.config` for the topology
+ * 3. A list of spout definitions
+ * 4. A list of bolt definitions
+ * 5. A list of stream definitions that define the flow between spouts and bolts.
+ *
+ */
+public class TopologyDef {
+ private static Logger LOG = LoggerFactory.getLogger(TopologyDef.class);
+
+ private String name;
+ private Map<String, BeanDef> componentMap = new LinkedHashMap<String, BeanDef>(); // not required
+ private List<IncludeDef> includes; // not required
+ private Map<String, Object> config = new HashMap<String, Object>();
+
+ // a "topology source" is a class that can produce a `StormTopology` thrift object.
+ private TopologySourceDef topologySource;
+
+ // the following are required if we're defining a core storm topology DAG in YAML, etc.
+ private Map<String, BoltDef> boltMap = new LinkedHashMap<String, BoltDef>();
+ private Map<String, SpoutDef> spoutMap = new LinkedHashMap<String, SpoutDef>();
+ private List<StreamDef> streams = new ArrayList<StreamDef>();
+
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setName(String name, boolean override){
+ if(this.name == null || override){
+ this.name = name;
+ } else {
+ LOG.warn("Ignoring attempt to set property 'name' with override == false.");
+ }
+ }
+
+ public List<SpoutDef> getSpouts() {
+ ArrayList<SpoutDef> retval = new ArrayList<SpoutDef>();
+ retval.addAll(this.spoutMap.values());
+ return retval;
+ }
+
+ public void setSpouts(List<SpoutDef> spouts) {
+ this.spoutMap = new LinkedHashMap<String, SpoutDef>();
+ for(SpoutDef spout : spouts){
+ this.spoutMap.put(spout.getId(), spout);
+ }
+ }
+
+ public List<BoltDef> getBolts() {
+ ArrayList<BoltDef> retval = new ArrayList<BoltDef>();
+ retval.addAll(this.boltMap.values());
+ return retval;
+ }
+
+ public void setBolts(List<BoltDef> bolts) {
+ this.boltMap = new LinkedHashMap<String, BoltDef>();
+ for(BoltDef bolt : bolts){
+ this.boltMap.put(bolt.getId(), bolt);
+ }
+ }
+
+ public List<StreamDef> getStreams() {
+ return streams;
+ }
+
+ public void setStreams(List<StreamDef> streams) {
+ this.streams = streams;
+ }
+
+ public Map<String, Object> getConfig() {
+ return config;
+ }
+
+ public void setConfig(Map<String, Object> config) {
+ this.config = config;
+ }
+
+ public List<BeanDef> getComponents() {
+ ArrayList<BeanDef> retval = new ArrayList<BeanDef>();
+ retval.addAll(this.componentMap.values());
+ return retval;
+ }
+
+ public void setComponents(List<BeanDef> components) {
+ this.componentMap = new LinkedHashMap<String, BeanDef>();
+ for(BeanDef component : components){
+ this.componentMap.put(component.getId(), component);
+ }
+ }
+
+ public List<IncludeDef> getIncludes() {
+ return includes;
+ }
+
+ public void setIncludes(List<IncludeDef> includes) {
+ this.includes = includes;
+ }
+
+ // utility methods
+ public int parallelismForBolt(String boltId){
+ return this.boltMap.get(boltId).getParallelism();
+ }
+
+ public BoltDef getBoltDef(String id){
+ return this.boltMap.get(id);
+ }
+
+ public SpoutDef getSpoutDef(String id){
+ return this.spoutMap.get(id);
+ }
+
+ public BeanDef getComponent(String id){
+ return this.componentMap.get(id);
+ }
+
+ // used by includes implementation
+ public void addAllBolts(List<BoltDef> bolts, boolean override){
+ for(BoltDef bolt : bolts){
+ String id = bolt.getId();
+ if(this.boltMap.get(id) == null || override) {
+ this.boltMap.put(bolt.getId(), bolt);
+ } else {
+ LOG.warn("Ignoring attempt to create bolt '{}' with override == false.", id);
+ }
+ }
+ }
+
+ public void addAllSpouts(List<SpoutDef> spouts, boolean override){
+ for(SpoutDef spout : spouts){
+ String id = spout.getId();
+ if(this.spoutMap.get(id) == null || override) {
+ this.spoutMap.put(spout.getId(), spout);
+ } else {
+ LOG.warn("Ignoring attempt to create spout '{}' with override == false.", id);
+ }
+ }
+ }
+
+ public void addAllComponents(List<BeanDef> components, boolean override) {
+ for(BeanDef bean : components){
+ String id = bean.getId();
+ if(this.componentMap.get(id) == null || override) {
+ this.componentMap.put(bean.getId(), bean);
+ } else {
+ LOG.warn("Ignoring attempt to create component '{}' with override == false.", id);
+ }
+ }
+ }
+
+ public void addAllStreams(List<StreamDef> streams, boolean override) {
+ //TODO figure out how we want to deal with overrides. Users may want to add streams even when overriding other
+ // properties. For now we just add them blindly which could lead to a potentially invalid topology.
+ this.streams.addAll(streams);
+ }
+
+ public TopologySourceDef getTopologySource() {
+ return topologySource;
+ }
+
+ public void setTopologySource(TopologySourceDef topologySource) {
+ this.topologySource = topologySource;
+ }
+
+ public boolean isDslTopology(){
+ return this.topologySource == null;
+ }
+
+
+ public boolean validate(){
+ boolean hasSpouts = this.spoutMap != null && this.spoutMap.size() > 0;
+ boolean hasBolts = this.boltMap != null && this.boltMap.size() > 0;
+ boolean hasStreams = this.streams != null && this.streams.size() > 0;
+ boolean hasSpoutsBoltsStreams = hasStreams && hasBolts && hasSpouts;
+ // you cant define a topologySource and a DSL topology at the same time...
+ if (!isDslTopology() && ((hasSpouts || hasBolts || hasStreams))) {
+ return false;
+ }
+ if(isDslTopology() && (hasSpouts && hasBolts && hasStreams)) {
+ return true;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
new file mode 100644
index 0000000..d6a2f57
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologySourceDef.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+public class TopologySourceDef extends ObjectDef {
+ public static final String DEFAULT_METHOD_NAME = "getTopology";
+
+ private String methodName;
+
+ public TopologySourceDef(){
+ this.methodName = DEFAULT_METHOD_NAME;
+ }
+
+ public String getMethodName() {
+ return methodName;
+ }
+
+ public void setMethodName(String methodName) {
+ this.methodName = methodName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
new file mode 100644
index 0000000..e71bcc2
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.model;
+
+/**
+ * Abstract parent class of component definitions
+ * (spouts/bolts)
+ */
+public abstract class VertexDef extends BeanDef {
+
+ // default parallelism to 1 so if it's ommitted, the topology will still function.
+ private int parallelism = 1;
+
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
new file mode 100644
index 0000000..35904a2
--- /dev/null
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.parser;
+
+import org.apache.storm.flux.model.BoltDef;
+import org.apache.storm.flux.model.IncludeDef;
+import org.apache.storm.flux.model.SpoutDef;
+import org.apache.storm.flux.model.TopologyDef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.TypeDescription;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Properties;
+
+public class FluxParser {
+ private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class);
+
+ private FluxParser(){}
+
+ // TODO refactor input stream processing (see parseResource() method).
+ public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes,
+ String propertiesFile, boolean envSub) throws IOException {
+
+ FileInputStream in = new FileInputStream(inputFile);
+ TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, propertiesFile, envSub);
+ in.close();
+
+ return topology;
+ }
+
+ public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes,
+ String propertiesFile, boolean envSub) throws IOException {
+
+ InputStream in = FluxParser.class.getResourceAsStream(resource);
+ TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, propertiesFile, envSub);
+ in.close();
+
+ return topology;
+ }
+
+ public static TopologyDef parseInputStream(InputStream inputStream, boolean dumpYaml, boolean processIncludes,
+ String propertiesFile, boolean envSub) throws IOException {
+
+ Yaml yaml = yaml();
+
+ if (inputStream == null) {
+ LOG.error("Unable to load input stream");
+ System.exit(1);
+ }
+
+ TopologyDef topology = loadYaml(yaml, inputStream, propertiesFile, envSub);
+
+ if (dumpYaml) {
+ dumpYaml(topology, yaml);
+ }
+
+ if (processIncludes) {
+ return processIncludes(yaml, topology, propertiesFile, envSub);
+ } else {
+ return topology;
+ }
+ }
+
+ private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile, boolean envSubstitution) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ LOG.info("loading YAML from input stream...");
+ int b = -1;
+ while((b = in.read()) != -1){
+ bos.write(b);
+ }
+
+ // TODO substitution implementation is not exactly efficient or kind to memory...
+ String str = bos.toString();
+ // properties file substitution
+ if(propsFile != null){
+ LOG.info("Performing property substitution.");
+ InputStream propsIn = new FileInputStream(propsFile);
+ Properties props = new Properties();
+ props.load(propsIn);
+ for(Object key : props.keySet()){
+ str = str.replace("${" + key + "}", props.getProperty((String)key));
+ }
+ } else {
+ LOG.info("Not performing property substitution.");
+ }
+
+ // environment variable substitution
+ if(envSubstitution){
+ LOG.info("Performing environment variable substitution...");
+ Map<String, String> envs = System.getenv();
+ for(String key : envs.keySet()){
+ str = str.replace("${ENV-" + key + "}", envs.get(key));
+ }
+ } else {
+ LOG.info("Not performing environment variable substitution.");
+ }
+ return (TopologyDef)yaml.load(str);
+ }
+
+ private static void dumpYaml(TopologyDef topology, Yaml yaml){
+ System.out.println("Configuration (interpreted): \n" + yaml.dump(topology));
+ }
+
+ private static Yaml yaml(){
+ Constructor constructor = new Constructor(TopologyDef.class);
+
+ TypeDescription topologyDescription = new TypeDescription(TopologyDef.class);
+ topologyDescription.putListPropertyType("spouts", SpoutDef.class);
+ topologyDescription.putListPropertyType("bolts", BoltDef.class);
+ topologyDescription.putListPropertyType("includes", IncludeDef.class);
+ constructor.addTypeDescription(topologyDescription);
+
+ Yaml yaml = new Yaml(constructor);
+ return yaml;
+ }
+
+ /**
+ *
+ * @param yaml the yaml parser for parsing the include file(s)
+ * @param topologyDef the topology definition containing (possibly zero) includes
+ * @return The TopologyDef with includes resolved.
+ */
+ private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile, boolean envSub)
+ throws IOException {
+ //TODO support multiple levels of includes
+ if(topologyDef.getIncludes() != null) {
+ for (IncludeDef include : topologyDef.getIncludes()){
+ TopologyDef includeTopologyDef = null;
+ if (include.isResource()) {
+ LOG.info("Loading includes from resource: {}", include.getFile());
+ includeTopologyDef = parseResource(include.getFile(), true, false, propsFile, envSub);
+ } else {
+ LOG.info("Loading includes from file: {}", include.getFile());
+ includeTopologyDef = parseFile(include.getFile(), true, false, propsFile, envSub);
+ }
+
+ // if overrides are disabled, we won't replace anything that already exists
+ boolean override = include.isOverride();
+ // name
+ if(includeTopologyDef.getName() != null){
+ topologyDef.setName(includeTopologyDef.getName(), override);
+ }
+
+ // config
+ if(includeTopologyDef.getConfig() != null) {
+ //TODO move this logic to the model class
+ Map<String, Object> config = topologyDef.getConfig();
+ Map<String, Object> includeConfig = includeTopologyDef.getConfig();
+ if(override) {
+ config.putAll(includeTopologyDef.getConfig());
+ } else {
+ for(String key : includeConfig.keySet()){
+ if(config.containsKey(key)){
+ LOG.warn("Ignoring attempt to set topology config property '{}' with override == false", key);
+ }
+ else {
+ config.put(key, includeConfig.get(key));
+ }
+ }
+ }
+ }
+
+ //component overrides
+ if(includeTopologyDef.getComponents() != null){
+ topologyDef.addAllComponents(includeTopologyDef.getComponents(), override);
+ }
+ //bolt overrides
+ if(includeTopologyDef.getBolts() != null){
+ topologyDef.addAllBolts(includeTopologyDef.getBolts(), override);
+ }
+ //spout overrides
+ if(includeTopologyDef.getSpouts() != null) {
+ topologyDef.addAllSpouts(includeTopologyDef.getSpouts(), override);
+ }
+ //stream overrides
+ //TODO streams should be uniquely identifiable
+ if(includeTopologyDef.getStreams() != null) {
+ topologyDef.addAllStreams(includeTopologyDef.getStreams(), override);
+ }
+ } // end include processing
+ }
+ return topologyDef;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/main/resources/splash.txt
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/resources/splash.txt b/flux/flux-core/src/main/resources/splash.txt
new file mode 100644
index 0000000..337931a
--- /dev/null
+++ b/flux/flux-core/src/main/resources/splash.txt
@@ -0,0 +1,9 @@
+\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2557 \u2588\u2588\u2557 \u2588\u2588\u2557\u2588\u2588\u2557 \u2588\u2588\u2557
+\u2588\u2588\u2554\u2550\u2550\u2550\u2550\u255d\u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551\u255a\u2588\u2588\u2557\u2588\u2588\u2554\u255d
+\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551 \u255a\u2588\u2588\u2588\u2554\u255d
+\u2588\u2588\u2554\u2550\u2550\u255d \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2554\u2588\u2588\u2557
+\u2588\u2588\u2551 \u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u255a\u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d\u2588\u2588\u2554\u255d \u2588\u2588\u2557
+\u255a\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u255d
++- Apache Storm -+
++- data FLow User eXperience -+
+Version: ${project.version}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
new file mode 100644
index 0000000..ff67867
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class FluxBuilderTest {
+
+ @Test
+ public void testIsPrimitiveNumber() throws Exception {
+ assertTrue(FluxBuilder.isPrimitiveNumber(int.class));
+ assertFalse(FluxBuilder.isPrimitiveNumber(boolean.class));
+ assertFalse(FluxBuilder.isPrimitiveNumber(String.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
new file mode 100644
index 0000000..c5807f8
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/IntegrationTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux;
+
+import org.junit.Test;
+
+public class IntegrationTest {
+
+ private static boolean skipTest = true;
+
+ static {
+ String skipStr = System.getProperty("skipIntegration");
+ if(skipStr != null && skipStr.equalsIgnoreCase("false")){
+ skipTest = false;
+ }
+ }
+
+ @Test
+ public void testRunTopologySource() throws Exception {
+ if(!skipTest) {
+ Flux.main(new String[]{"-s", "30000", "src/test/resources/configs/existing-topology.yaml"});
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
new file mode 100644
index 0000000..c9227f6
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.flux.model.ExecutionContext;
+import org.apache.storm.flux.model.TopologyDef;
+import org.apache.storm.flux.parser.FluxParser;
+import org.apache.storm.flux.test.TestBolt;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TCKTest {
+ @Test
+ public void testTCK() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/tck.yaml", false, true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+ @Test
+ public void testShellComponents() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/shell_test.yaml", false, true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBadShellComponents() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/bad_shell_test.yaml", false, true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+ @Test
+ public void testKafkaSpoutConfig() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+ @Test
+ public void testLoadFromResource() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+
+ @Test
+ public void testHdfs() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/hdfs_test.yaml", false, true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+ @Test
+ public void testDiamondTopology() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/diamond-topology.yaml", false, true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+
+ @Test
+ public void testHbase() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/simple_hbase.yaml", false, true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBadHbase() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/bad_hbase.yaml", false, true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+ @Test
+ public void testIncludes() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null, false);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ assertTrue(topologyDef.getName().equals("include-topology"));
+ assertTrue(topologyDef.getBolts().size() > 0);
+ assertTrue(topologyDef.getSpouts().size() > 0);
+ topology.validate();
+ }
+
+ @Test
+ public void testTopologySource() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology.yaml", false, true, null, false);
+ assertTrue(topologyDef.validate());
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+ @Test
+ public void testTopologySourceWithReflection() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false);
+ assertTrue(topologyDef.validate());
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+ @Test
+ public void testTopologySourceWithConfigParam() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", false, true, null, false);
+ assertTrue(topologyDef.validate());
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+ @Test
+ public void testTopologySourceWithMethodName() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-method-override.yaml", false, true, null, false);
+ assertTrue(topologyDef.validate());
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+
+ @Test
+ public void testTridentTopologySource() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, true, null, false);
+ assertTrue(topologyDef.validate());
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidTopologySource() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, true, null, false);
+ assertFalse("Topology config is invalid.", topologyDef.validate());
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ }
+
+
+ @Test
+ public void testTopologySourceWithGetMethodName() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false);
+ assertTrue(topologyDef.validate());
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+ }
+
+ @Test
+ public void testTopologySourceWithConfigMethods() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/config-methods-test.yaml", false, true, null, false);
+ assertTrue(topologyDef.validate());
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+
+ // make sure the property was actually set
+ TestBolt bolt = (TestBolt)context.getBolt("bolt-1");
+ assertTrue(bolt.getFoo().equals("foo"));
+ assertTrue(bolt.getBar().equals("bar"));
+ assertTrue(bolt.getFooBar().equals("foobar"));
+ assertArrayEquals(new TestBolt.TestClass[] {new TestBolt.TestClass("foo"), new TestBolt.TestClass("bar"), new TestBolt.TestClass("baz")}, bolt.getClasses());
+ }
+
+ @Test
+ public void testVariableSubstitution() throws Exception {
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/substitution-test.yaml", false, true, "src/test/resources/configs/test.properties", true);
+ assertTrue(topologyDef.validate());
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ assertNotNull(topology);
+ topology.validate();
+
+ // test basic substitution
+ assertEquals("Property not replaced.",
+ "substitution-topology",
+ context.getTopologyDef().getName());
+
+ // test environment variable substitution
+ // $PATH should be defined on most systems
+ String envPath = System.getenv().get("PATH");
+ assertEquals("ENV variable not replaced.",
+ envPath,
+ context.getTopologyDef().getConfig().get("test.env.value"));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java
new file mode 100644
index 0000000..dcded17
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.multilang;
+
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Sanity checks to make sure we can at least invoke the shells used.
+ */
+public class MultilangEnvirontmentTest {
+ private static final Logger LOG = LoggerFactory.getLogger(MultilangEnvirontmentTest.class);
+
+ @Test
+ public void testInvokePython() throws Exception {
+ String[] command = new String[]{"python", "--version"};
+ int exitVal = invokeCommand(command);
+ assertEquals("Exit value for python is 0.", 0, exitVal);
+ }
+
+ @Test
+ public void testInvokeNode() throws Exception {
+ String[] command = new String[]{"node", "--version"};
+ int exitVal = invokeCommand(command);
+ assertEquals("Exit value for node is 0.", 0, exitVal);
+ }
+
+ private static class StreamRedirect implements Runnable {
+ private InputStream in;
+ private OutputStream out;
+
+ public StreamRedirect(InputStream in, OutputStream out) {
+ this.in = in;
+ this.out = out;
+ }
+
+ @Override
+ public void run() {
+ try {
+ int i = -1;
+ while ((i = this.in.read()) != -1) {
+ out.write(i);
+ }
+ this.in.close();
+ this.out.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private int invokeCommand(String[] args) throws Exception {
+ LOG.debug("Invoking command: {}", args);
+
+ ProcessBuilder pb = new ProcessBuilder(args);
+ pb.redirectErrorStream(true);
+ final Process proc = pb.start();
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Thread t = new Thread(new StreamRedirect(proc.getInputStream(), out));
+ t.start();
+ int exitVal = proc.waitFor();
+ LOG.debug("Command result: {}", out.toString());
+ return exitVal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java
new file mode 100644
index 0000000..ff65a8a
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.test;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.flux.wrappers.bolts.LogInfoBolt;
+import org.apache.storm.flux.wrappers.spouts.FluxShellSpout;
+
+import java.util.Map;
+
+/**
+ * Test topology source that does not implement TopologySource, but has the same
+ * `getTopology()` method.
+ */
+public class SimpleTopology{
+
+
+ public SimpleTopology(){}
+
+ public SimpleTopology(String foo, String bar){}
+
+ public StormTopology getTopologyWithDifferentMethodName(Map<String, Object> config){
+ return getTopology(config);
+ }
+
+
+ public StormTopology getTopology(Map<String, Object> config) {
+ TopologyBuilder builder = new TopologyBuilder();
+
+ // spouts
+ FluxShellSpout spout = new FluxShellSpout(
+ new String[]{"node", "randomsentence.js"},
+ new String[]{"word"});
+ builder.setSpout("sentence-spout", spout, 1);
+
+ // bolts
+ builder.setBolt("log-bolt", new LogInfoBolt(), 1)
+ .shuffleGrouping("sentence-spout");
+
+ return builder.createTopology();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java
new file mode 100644
index 0000000..2fadacf
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.test;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.flux.api.TopologySource;
+import org.apache.storm.flux.wrappers.bolts.LogInfoBolt;
+import org.apache.storm.flux.wrappers.spouts.FluxShellSpout;
+
+import java.util.Map;
+
+public class SimpleTopologySource implements TopologySource {
+
+
+ public SimpleTopologySource(){}
+
+ public SimpleTopologySource(String foo, String bar){}
+
+
+ @Override
+ public StormTopology getTopology(Map<String, Object> config) {
+ TopologyBuilder builder = new TopologyBuilder();
+
+ // spouts
+ FluxShellSpout spout = new FluxShellSpout(
+ new String[]{"node", "randomsentence.js"},
+ new String[]{"word"});
+ builder.setSpout("sentence-spout", spout, 1);
+
+ // bolts
+ builder.setBolt("log-bolt", new LogInfoBolt(), 1)
+ .shuffleGrouping("sentence-spout");
+
+ return builder.createTopology();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java
new file mode 100644
index 0000000..78195b5
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.test;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.flux.wrappers.bolts.LogInfoBolt;
+import org.apache.storm.flux.wrappers.spouts.FluxShellSpout;
+
+/**
+ * Test topology source that does not implement TopologySource, but has the same
+ * `getTopology()` method.
+ */
+public class SimpleTopologyWithConfigParam {
+
+
+ public SimpleTopologyWithConfigParam(){}
+
+ public SimpleTopologyWithConfigParam(String foo, String bar){}
+
+
+ public StormTopology getTopology(Config config) {
+ TopologyBuilder builder = new TopologyBuilder();
+
+ // spouts
+ FluxShellSpout spout = new FluxShellSpout(
+ new String[]{"node", "randomsentence.js"},
+ new String[]{"word"});
+ builder.setSpout("sentence-spout", spout, 1);
+
+ // bolts
+ builder.setBolt("log-bolt", new LogInfoBolt(), 1)
+ .shuffleGrouping("sentence-spout");
+
+ return builder.createTopology();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java
new file mode 100644
index 0000000..28d11b6
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.test;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+
+public class TestBolt extends BaseBasicBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(TestBolt.class);
+
+ private String foo;
+ private String bar;
+ private String fooBar;
+ private String none;
+ private TestClass[] classes;
+
+ public static class TestClass implements Serializable {
+ private String field;
+
+ public TestClass(String field) {
+ this.field = field;
+ }
+
+ public String getField() {
+ return field;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof TestClass)) return false;
+
+ TestClass testClass = (TestClass) o;
+
+ return getField() != null ? getField().equals(testClass.getField()) : testClass.getField() == null;
+ }
+
+ @Override
+ public int hashCode() {
+ return getField() != null ? getField().hashCode() : 0;
+ }
+ }
+
+
+ public static enum TestEnum {
+ FOO,
+ BAR
+ }
+
+ public TestBolt(TestEnum te){
+
+ }
+
+ public TestBolt(TestEnum te, float f){
+
+ }
+
+ public TestBolt(TestEnum te, float f, boolean b){
+
+ }
+
+ public TestBolt(TestEnum te, float f, boolean b, TestClass... str) {
+
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+ LOG.info("{}", tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+ }
+
+ // config methods
+ public void withFoo(String foo){
+ this.foo = foo;
+ }
+ public void withNone(){
+ this.none = "hit";
+ }
+ public void withBar(String bar){
+ this.bar = bar;
+ }
+
+ public void withFooBar(String foo, String bar){
+ this.fooBar = foo + bar;
+ }
+
+ public void withClasses(TestClass...classes) {
+ this.classes = classes;
+ }
+
+ public String getFoo(){
+ return this.foo;
+ }
+ public String getBar(){
+ return this.bar;
+ }
+
+ public String getFooBar(){
+ return this.fooBar;
+ }
+
+ public TestClass[] getClasses() {
+ return classes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
new file mode 100644
index 0000000..36b272b
--- /dev/null
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.test;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.kafka.StringScheme;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ * Basic Trident example that will return a `StormTopology` from a `getTopology()` method.
+ */
+public class TridentTopologySource {
+
+ private FixedBatchSpout spout;
+
+ public StormTopology getTopology(Config config) {
+
+ this.spout = new FixedBatchSpout(new Fields("sentence"), 20,
+ new Values("one two"),
+ new Values("two three"),
+ new Values("three four"),
+ new Values("four five"),
+ new Values("five six")
+ );
+
+
+ TridentTopology trident = new TridentTopology();
+
+ trident.newStream("wordcount", spout).name("sentence").parallelismHint(1).shuffle()
+ .each(new Fields("sentence"), new Split(), new Fields("word"))
+ .parallelismHint(1)
+ .groupBy(new Fields("word"))
+ .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
+ .parallelismHint(1);
+ return trident.build();
+ }
+
+ public static class Split extends BaseFunction {
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ String sentence = tuple.getString(0);
+ for (String word : sentence.split(" ")) {
+ collector.emit(new Values(word));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/resources/configs/bad_hbase.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/bad_hbase.yaml b/flux/flux-core/src/test/resources/configs/bad_hbase.yaml
new file mode 100644
index 0000000..a29e314
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/bad_hbase.yaml
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Test ability to wire together shell spouts/bolts
+---
+
+# topology definition
+# name to be used when submitting
+name: "hbase-wordcount"
+
+# Components
+# Components are analagous to Spring beans. They are meant to be used as constructor,
+# property(setter), and builder arguments.
+#
+# for the time being, components must be declared in the order they are referenced
+
+components:
+ - id: "columnFields"
+ className: "org.apache.storm.tuple.Fields"
+ constructorArgs:
+ - ["word"]
+
+ - id: "counterFields"
+ className: "org.apache.storm.tuple.Fields"
+ constructorArgs:
+ # !!! the following won't work, and should thow an IllegalArgumentException...
+ - "count"
+
+ - id: "mapper"
+ className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper"
+ configMethods:
+ - name: "withRowKeyField"
+ args: ["word"]
+ - name: "withColumnFields"
+ args: [ref: "columnFields"]
+ - name: "withCounterFields"
+ args: [ref: "counterFields"]
+ - name: "withColumnFamily"
+ args: ["cf"]
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+#
+config:
+ topology.workers: 1
+ hbase.conf:
+ hbase.rootdir: "hdfs://hadoop:54310/hbase"
+ hbase.zookeeper.quorum: "hadoop"
+
+# spout definitions
+spouts:
+ - id: "word-spout"
+ className: "org.apache.storm.testing.TestWordSpout"
+ parallelism: 1
+
+# bolt definitions
+
+bolts:
+ - id: "count-bolt"
+ className: "org.apache.storm.testing.TestWordCounter"
+
+ - id: "hbase-bolt"
+ className: "org.apache.storm.hbase.bolt.HBaseBolt"
+ constructorArgs:
+ - "WordCount" # HBase table name
+ - ref: "mapper"
+ configMethods:
+ - name: "withConfigKey"
+ args: ["hbase.conf"]
+ parallelism: 1
+
+
+streams:
+ - name: "" # name isn't used (placeholder for logging, UI, etc.)
+ from: "word-spout"
+ to: "count-bolt"
+ grouping:
+ type: SHUFFLE
+
+ - name: "" # name isn't used (placeholder for logging, UI, etc.)
+ from: "count-bolt"
+ to: "hbase-bolt"
+ grouping:
+ type: FIELDS
+ args: ["word"]
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/resources/configs/bad_shell_test.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/bad_shell_test.yaml b/flux/flux-core/src/test/resources/configs/bad_shell_test.yaml
new file mode 100644
index 0000000..0892ce7
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/bad_shell_test.yaml
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Test ability to wire together shell spouts/bolts
+---
+
+# topology definition
+# name to be used when submitting
+name: "shell-topology"
+
+# Components
+# Components are analagous to Spring beans. They are meant to be used as constructor,
+# property(setter), and builder arguments.
+#components:
+# - id: "myComponent"
+# className: "com.foo.bar.MyComponent"
+# constructorArgs:
+# - ...
+# properties:
+# foo: "bar"
+# bar: "foo"
+
+# NOTE: We may want to consider some level of spring integration. For example, allowing component references
+# to a spring `ApplicationContext`.
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+#
+config:
+ topology.workers: 1
+ # ...
+
+# spout definitions
+spouts:
+ - id: "sentence-spout"
+ className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
+ # shell spout constructor takes 2 arguments: String[], String[]
+ constructorArgs:
+ # command line
+ - ["node", "randomsentence.js"]
+ # output fields
+ - ["word"]
+ configMethods:
+ - name: "addComponentConfig"
+ args: ["rabbitmq.configfile", "etc/rabbit.yml", "hello"]
+ - name: "addComponentConfig"
+ args:
+ - "publisher.data_paths"
+ - ["actions", "hello"]
+ parallelism: 1
+ # ...
+
+# bolt definitions
+bolts:
+ - id: "splitsentence"
+ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
+ constructorArgs:
+ # command line
+ - ["python", "splitsentence.py"]
+ # output fields
+ - ["word"]
+ configMethods:
+ - name: "addComponentConfig"
+ args: ["rabbitmq.configfile", "etc/rabbit.yml", "hello"]
+ - name: "addComponentConfig"
+ args:
+ - "publisher.data_paths"
+ - ["actions", "hello"]
+ parallelism: 1
+ # ...
+
+ - id: "log"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+ # ...
+
+ - id: "count"
+ className: "org.apache.storm.testing.TestWordCounter"
+ parallelism: 1
+ # ...
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+# custom stream groupings are also supported
+
+streams:
+ - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.)
+ from: "sentence-spout"
+ to: "splitsentence"
+ grouping:
+ type: SHUFFLE
+
+ - name: "split --> count"
+ from: "splitsentence"
+ to: "count"
+ grouping:
+ type: FIELDS
+ args: ["word"]
+
+ - name: "count --> log"
+ from: "count"
+ to: "log"
+ grouping:
+ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/resources/configs/config-methods-test.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/config-methods-test.yaml b/flux/flux-core/src/test/resources/configs/config-methods-test.yaml
new file mode 100644
index 0000000..7c4ffb3
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/config-methods-test.yaml
@@ -0,0 +1,92 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+---
+name: "yaml-topology"
+
+#
+config:
+ topology.workers: 1
+ # ...
+
+components:
+ - id: "foo"
+ className: "org.apache.storm.flux.test.TestBolt$TestClass"
+ constructorArgs:
+ - "foo"
+ - id: "bar"
+ className: "org.apache.storm.flux.test.TestBolt$TestClass"
+ constructorArgs:
+ - "bar"
+ - id: "baz"
+ className: "org.apache.storm.flux.test.TestBolt$TestClass"
+ constructorArgs:
+ - "baz"
+
+# spout definitions
+spouts:
+ - id: "spout-1"
+ className: "org.apache.storm.testing.TestWordSpout"
+ parallelism: 1
+ # ...
+
+# bolt definitions
+bolts:
+ - id: "bolt-1"
+ className: "org.apache.storm.flux.test.TestBolt"
+ parallelism: 1
+ constructorArgs:
+ - FOO # enum class
+ - 1.0
+ - true
+ - reflist: ["foo", "bar"]
+ configMethods:
+ - name: "withFoo"
+ args:
+ - "foo"
+ - name: "withNone"
+ - name: "withBar"
+ args:
+ - "bar"
+ - name: "withFooBar"
+ args:
+ - "foo"
+ - "bar"
+ - name: "withClasses"
+ args:
+ - reflist:
+ - "foo"
+ - "bar"
+ - "baz"
+
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+streams:
+ - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
+# id: "connection-1"
+ from: "spout-1"
+ to: "bolt-1"
+ grouping:
+ type: SHUFFLE
+
+
+
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/resources/configs/diamond-topology.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/diamond-topology.yaml b/flux/flux-core/src/test/resources/configs/diamond-topology.yaml
new file mode 100644
index 0000000..957c258
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/diamond-topology.yaml
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+# topology definition
+# name to be used when submitting
+name: "diamond-topology"
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+#
+config:
+ topology.workers: 1
+
+# spout definitions
+spouts:
+ - id: "spout-1"
+ className: "org.apache.storm.testing.TestWordSpout"
+ parallelism: 1
+
+# bolt definitions
+bolts:
+ - id: "A"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+ - id: "B"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+ - id: "C"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+ - id: "D"
+ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+ parallelism: 1
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+streams:
+ - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
+# id: "connection-1"
+ from: "spout-1"
+ to: "A"
+ grouping:
+ type: FIELDS
+ args: ["word"]
+
+ - from: "A"
+ to: "B"
+ grouping:
+ type: SHUFFLE
+
+ - from: "A"
+ to: "C"
+ grouping:
+ type: SHUFFLE
+
+ - from: "C"
+ to: "D"
+ grouping:
+ type: SHUFFLE
+
+ - from: "B"
+ to: "D"
+ grouping:
+ type: SHUFFLE
+
+
+
+
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml b/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml
new file mode 100644
index 0000000..fceeeed
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+---
+
+# configuration that uses an existing topology that does not implement TopologySource
+name: "existing-topology"
+topologySource:
+ className: "org.apache.storm.flux.test.SimpleTopology"
+ methodName: "getTopologyWithDifferentMethodName"
+ constructorArgs:
+ - "foo"
+ - "bar"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9d78338/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml b/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml
new file mode 100644
index 0000000..440fe4d
--- /dev/null
+++ b/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+---
+
+# configuration that uses an existing topology that does not implement TopologySource
+name: "existing-topology"
+topologySource:
+ className: "org.apache.storm.flux.test.SimpleTopologyWithConfigParam"
+ constructorArgs:
+ - "foo"
+ - "bar"
\ No newline at end of file