You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2016/03/17 21:51:21 UTC
[1/2] storm git commit: Initial changes.
Repository: storm
Updated Branches:
refs/heads/1.x-branch fc64e158f -> 885aaec43
Initial changes.
Signed-off-by: Kyle Nusbaum <Ky...@gmail.com>
Ready for PR
Signed-off-by: Kyle Nusbaum <Ky...@gmail.com>
Addressing comments.
Signed-off-by: Kyle Nusbaum <Ky...@gmail.com>
Addressing Comments.
Signed-off-by: Kyle Nusbaum <Ky...@gmail.com>
adding code documentation explaining math of combining component resources.
Signed-off-by: Kyle Nusbaum <Ky...@gmail.com>
Addressing comments and adding a bit more documentation.
Signed-off-by: Kyle Nusbaum <Ky...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d0d53be9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d0d53be9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d0d53be9
Branch: refs/heads/1.x-branch
Commit: d0d53be94a967a07ed767ec1c79738649dd70722
Parents: fc64e15
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Tue Mar 8 15:46:23 2016 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Mar 17 15:49:53 2016 -0500
----------------------------------------------------------------------
.../clj/org/apache/storm/trident/testing.clj | 12 +-
.../ComponentConfigurationDeclarer.java | 5 +-
.../apache/storm/topology/ResourceDeclarer.java | 28 +++++
.../jvm/org/apache/storm/trident/Stream.java | 31 +++++-
.../org/apache/storm/trident/TridentState.java | 27 ++++-
.../apache/storm/trident/TridentTopology.java | 91 ++++++++++++++-
.../org/apache/storm/trident/graph/Group.java | 22 +++-
.../operation/DefaultResourceDeclarer.java | 66 +++++++++++
.../trident/operation/ITridentResource.java | 32 ++++++
.../org/apache/storm/trident/planner/Node.java | 5 +-
.../apache/storm/trident/integration_test.clj | 111 ++++++++++++++++---
11 files changed, 390 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/clj/org/apache/storm/trident/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/trident/testing.clj b/storm-core/src/clj/org/apache/storm/trident/testing.clj
index 44e5ca9..aafb001 100644
--- a/storm-core/src/clj/org/apache/storm/trident/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/trident/testing.clj
@@ -55,14 +55,14 @@
(.shutdown ~drpc)
))
-(defn with-topology* [cluster topo body-fn]
- (t/submit-local-topology (:nimbus cluster) "tester" {} (.build topo))
+(defn with-topology* [cluster storm-topo body-fn]
+ (t/submit-local-topology (:nimbus cluster) "tester" {} storm-topo)
(body-fn)
- (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0)))
- )
+ (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0))))
-(defmacro with-topology [[cluster topo] & body]
- `(with-topology* ~cluster ~topo (fn [] ~@body)))
+(defmacro with-topology [[cluster topo storm-topo] & body]
+ `(let [~storm-topo (.build ~topo)]
+ (with-topology* ~cluster ~storm-topo (fn [] ~@body))))
(defn bootstrap-imports []
(import 'org.apache.storm.LocalDRPC)
http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
index 328af55..5dc7264 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
@@ -19,14 +19,11 @@ package org.apache.storm.topology;
import java.util.Map;
-public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> {
+public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> extends ResourceDeclarer<T> {
T addConfigurations(Map<String, Object> conf);
T addConfiguration(String config, Object value);
T setDebug(boolean debug);
T setMaxTaskParallelism(Number val);
T setMaxSpoutPending(Number val);
T setNumTasks(Number val);
- T setMemoryLoad(Number onHeap);
- T setMemoryLoad(Number onHeap, Number offHeap);
- T setCPULoad(Number amount);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
new file mode 100644
index 0000000..4f648eb
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+/**
+ * This is a new base interface that can be used by anything that wants to mirror
+ * RAS's basic API. Trident uses this to allow setting resources in the Stream API.
+ */
+public interface ResourceDeclarer <T extends ResourceDeclarer> {
+ T setMemoryLoad(Number onHeap);
+ T setMemoryLoad(Number onHeap, Number offHeap);
+ T setCPULoad(Number amount);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index d313678..4a51b56 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -20,6 +20,7 @@ package org.apache.storm.trident;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.NullStruct;
import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.topology.ResourceDeclarer;
import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
import org.apache.storm.trident.fluent.GlobalAggregationScheme;
import org.apache.storm.trident.fluent.GroupedStream;
@@ -90,7 +91,7 @@ import java.util.Comparator;
*
*/
// TODO: need to be able to replace existing fields with the function fields (like Cascading Fields.REPLACE)
-public class Stream implements IAggregatableStream {
+public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
Node _node;
TridentTopology _topology;
String _name;
@@ -124,6 +125,34 @@ public class Stream implements IAggregatableStream {
}
/**
+ * Sets the CPU Load resource for the current operation
+ */
+ @Override
+ public Stream setCPULoad(Number load) {
+ _node.setCPULoad(load);
+ return this;
+ }
+
+ /**
+ * Sets the Memory Load resources for the current operation.
+ * offHeap becomes default
+ */
+ @Override
+ public Stream setMemoryLoad(Number onHeap) {
+ _node.setMemoryLoad(onHeap);
+ return this;
+ }
+
+ /**
+ * Sets the Memory Load resources for the current operation.
+ */
+ @Override
+ public Stream setMemoryLoad(Number onHeap, Number offHeap) {
+ _node.setMemoryLoad(onHeap, offHeap);
+ return this;
+ }
+
+ /**
* Filters out fields from a stream, resulting in a Stream containing only the fields specified by `keepFields`.
*
* For example, if you had a Stream `mystream` containing the fields `["a", "b", "c","d"]`, calling"
http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
index 7173254..18b60e0 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
@@ -17,24 +17,43 @@
*/
package org.apache.storm.trident;
+import org.apache.storm.topology.ResourceDeclarer;
import org.apache.storm.trident.planner.Node;
-public class TridentState {
+public class TridentState implements ResourceDeclarer<TridentState> {
TridentTopology _topology;
Node _node;
-
+
protected TridentState(TridentTopology topology, Node node) {
_topology = topology;
_node = node;
}
-
+
public Stream newValuesStream() {
return new Stream(_topology, _node.name, _node);
}
-
+
public TridentState parallelismHint(int parallelism) {
_node.parallelismHint = parallelism;
return this;
}
+
+ @Override
+ public TridentState setCPULoad(Number load) {
+ _node.setCPULoad(load);
+ return this;
+ }
+
+ @Override
+ public TridentState setMemoryLoad(Number onHeap) {
+ _node.setMemoryLoad(onHeap);
+ return this;
+ }
+
+ @Override
+ public TridentState setMemoryLoad(Number onHeap, Number offHeap) {
+ _node.setMemoryLoad(onHeap, offHeap);
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index eb50a10..e0a349b 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -44,6 +44,7 @@ import org.apache.storm.trident.fluent.UniqueIdGen;
import org.apache.storm.trident.graph.GraphGrouper;
import org.apache.storm.trident.graph.Group;
import org.apache.storm.trident.operation.GroupedMultiReducer;
+import org.apache.storm.trident.operation.ITridentResource;
import org.apache.storm.trident.operation.MultiReducer;
import org.apache.storm.trident.operation.impl.FilterExecutor;
import org.apache.storm.trident.operation.impl.GroupedMultiReducerExecutor;
@@ -394,11 +395,23 @@ public class TridentTopology {
Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
Map<Group, String> boltIds = genBoltIds(mergedGroups);
+ Map defaults = Utils.readDefaultConfig();
+
for(SpoutNode sn: spoutNodes) {
Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
+
+ Map<String, Number> spoutRes = null;
+ spoutRes = mergeDefaultResources(sn.getResources(), defaults);
+ Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+ Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
if(sn.type == SpoutNode.SpoutType.DRPC) {
+
builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
- (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
+ (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn))
+ .setMemoryLoad(onHeap, offHeap)
+ .setCPULoad(cpuLoad);
} else {
ITridentSpout s;
if(sn.spout instanceof IBatchSpout) {
@@ -409,16 +422,26 @@ public class TridentTopology {
throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
// TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
}
- builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
+ builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn))
+ .setMemoryLoad(onHeap, offHeap)
+ .setCPULoad(cpuLoad);
}
}
-
+
for(Group g: mergedGroups) {
if(!isSpoutGroup(g)) {
Integer p = parallelisms.get(g);
Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
+ Map<String, Number> groupRes = mergeDefaultResources(g.getResources(), defaults);
+
+ Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+ Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
- committerBatches(g, batchGroupMap), streamToGroup);
+ committerBatches(g, batchGroupMap), streamToGroup)
+ .setMemoryLoad(onHeap, offHeap)
+ .setCPULoad(cpuLoad);
Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
for(PartitionNode n: inputs) {
Node parent = TridentUtils.getParent(graph, n);
@@ -431,6 +454,64 @@ public class TridentTopology {
return builder.buildTopology();
}
+
+ private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map defaultConfig) {
+ Map<String, Number> ret = new HashMap<String, Number>();
+
+ Number onHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ Number offHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+ Number cpuLoadDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
+ if(res == null) {
+ ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeapDefault);
+ ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeapDefault);
+ ret.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpuLoadDefault);
+ return ret;
+ }
+
+ Number onHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ Number offHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+ Number cpuLoad = res.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
+ /* We take the max of the default and whatever the user put in here.
+ Each node's resources can be the sum of several operations, so the simplest
+ thing to do is get the max.
+
+ The situation we want to avoid is that the user sets low resources on one
+ node, and when that node is combined with a bunch of others, the sum is still
+ that low resource count. If any component isn't set, we want to use the default.
+
+ Right now, this code does not check that. It just takes the max of the summed
+ up resource counts for simplicity's sake. We could perform some more complicated
+ logic to be more accurate, but the benefits are very small, and only apply to some
+ very odd corner cases. */
+ if(onHeap == null) {
+ onHeap = onHeapDefault;
+ }
+ else {
+ onHeap = Math.max(onHeap.doubleValue(), onHeapDefault.doubleValue());
+ }
+
+ if(offHeap == null) {
+ offHeap = offHeapDefault;
+ }
+ else {
+ offHeap = Math.max(offHeap.doubleValue(), offHeapDefault.doubleValue());
+ }
+
+ if(cpuLoad == null) {
+ cpuLoad = cpuLoadDefault;
+ }
+ else {
+ cpuLoad = Math.max(cpuLoad.doubleValue(), cpuLoadDefault.doubleValue());
+ }
+
+ ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+ ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
+ ret.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpuLoad);
+
+ return ret;
+ }
private static void completeDRPC(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
@@ -464,7 +545,7 @@ public class TridentTopology {
}
return ret;
}
-
+
//returns null if it's not a drpc group
private static SpoutNode getDRPCSpoutNode(Collection<Node> g) {
for(Node n: g) {
http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
index ef1399b..2c92304 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
@@ -18,17 +18,20 @@
package org.apache.storm.trident.graph;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.jgrapht.DirectedGraph;
+import org.apache.storm.trident.operation.ITridentResource;
import org.apache.storm.trident.planner.Node;
import org.apache.storm.trident.util.IndexedEdge;
import org.apache.storm.trident.util.TridentUtils;
-public class Group {
+public class Group implements ITridentResource {
public final Set<Node> nodes = new HashSet<>();
private final DirectedGraph<Node, IndexedEdge> graph;
private final String id = UUID.randomUUID().toString();
@@ -65,6 +68,23 @@ public class Group {
}
@Override
+ public Map<String, Number> getResources() {
+ Map<String, Number> ret = new HashMap<>();
+ for(Node n: nodes) {
+ Map<String, Number> res = n.getResources();
+ for(Map.Entry<String, Number> kv : res.entrySet()) {
+ String key = kv.getKey();
+ Number val = kv.getValue();
+ if(ret.containsKey(key)) {
+ val = new Double(val.doubleValue() + ret.get(key).doubleValue());
+ }
+ ret.put(key, val);
+ }
+ }
+ return ret;
+ }
+
+ @Override
public int hashCode() {
return id.hashCode();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
new file mode 100644
index 0000000..d49011a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.topology.ResourceDeclarer;
+
+/**
+ * @param T Must always be the type of the extending class. i.e.
+ * public class SubResourceDeclarer extends DefaultResourceDeclarer<SubResourceDeclarer> {...}
+ */
+public class DefaultResourceDeclarer<T extends DefaultResourceDeclarer> implements ResourceDeclarer<T>, ITridentResource {
+
+ private Map<String, Number> resources = new HashMap<>();
+ private Map conf = Utils.readStormConfig();
+
+ @Override
+ public T setMemoryLoad(Number onHeap) {
+ return setMemoryLoad(onHeap, Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)));
+ }
+
+ @Override
+ public T setMemoryLoad(Number onHeap, Number offHeap) {
+ if (onHeap != null) {
+ onHeap = onHeap.doubleValue();
+ resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+ }
+ if (offHeap!=null) {
+ offHeap = offHeap.doubleValue();
+ resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
+ }
+ return (T)this;
+ }
+
+ @Override
+ public T setCPULoad(Number amount) {
+ if(amount != null) {
+ amount = amount.doubleValue();
+ resources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount);
+ }
+ return (T)this;
+ }
+
+ @Override
+ public Map<String, Number> getResources() {
+ return new HashMap<String, Number>(resources);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
new file mode 100644
index 0000000..b3e10ef
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import java.util.Map;
+
+/**
+ * This interface is implemented by various Trident classes in order to
+ * gather and propogate resources that have been set on them.
+ * @see ResourceDeclarer
+ */
+public interface ITridentResource {
+ /**
+ * @return a name of resource name -> amount of that resource. *Return should never be null!*
+ */
+ Map<String, Number> getResources();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
index 64d8a3b..b2466e6 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.trident.planner;
+import org.apache.storm.trident.operation.DefaultResourceDeclarer;
import org.apache.storm.tuple.Fields;
import java.io.Serializable;
import java.util.UUID;
@@ -25,7 +26,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
-public class Node implements Serializable {
+public class Node extends DefaultResourceDeclarer<Node> implements Serializable {
private static final AtomicInteger INDEX = new AtomicInteger(0);
private String nodeId;
@@ -62,6 +63,4 @@ public class Node implements Serializable {
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
}
-
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
index 4c52286..7f81c4b 100644
--- a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
@@ -19,10 +19,13 @@
(:import [org.apache.storm.trident.testing Split CountAsAggregator StringLength TrueFilter
MemoryMapState$Factory])
(:import [org.apache.storm.trident.state StateSpec])
- (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater])
- (:use [org.apache.storm.trident testing])
- (:use [org.apache.storm util]))
-
+ (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater]
+ [org.apache.storm.trident.operation BaseFunction]
+ [org.json.simple.parser JSONParser]
+ [org.apache.storm Config])
+ (:use [org.apache.storm.trident testing]
+ [org.apache.storm log util config]))
+
(bootstrap-imports)
(deftest test-memory-map-get-tuples
@@ -38,13 +41,13 @@
(.groupBy (fields "word"))
(.persistentAggregate (memory-map-state) (Count.) (fields "count"))
(.parallelismHint 6)
- ))
+ ))
(-> topo
(.newDRPCStream "all-tuples" drpc)
(.broadcast)
(.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count"))
(.project (fields "word" "count")))
- (with-topology [cluster topo]
+ (with-topology [cluster topo storm-topo]
(feed feeder [["hello the man said"] ["the"]])
(is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]}
(into #{} (exec-drpc drpc "all-tuples" "man"))))
@@ -73,7 +76,7 @@
(.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
(.aggregate (fields "count") (Sum.) (fields "sum"))
(.project (fields "sum")))
- (with-topology [cluster topo]
+ (with-topology [cluster topo storm-topo]
(feed feeder [["hello the man said"] ["the"]])
(is (= [[2]] (exec-drpc drpc "words" "the")))
(is (= [[1]] (exec-drpc drpc "words" "hello")))
@@ -83,7 +86,7 @@
(is (= [[8]] (exec-drpc drpc "words" "man where you the")))
)))))
-;; this test reproduces a bug where committer spouts freeze processing when
+;; this test reproduces a bug where committer spouts freeze processing when
;; there's at least one repartitioning after the spout
(deftest test-word-count-committer-spout
(t/with-local-cluster [cluster]
@@ -108,7 +111,7 @@
(.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
(.aggregate (fields "count") (Sum.) (fields "sum"))
(.project (fields "sum")))
- (with-topology [cluster topo]
+ (with-topology [cluster topo storm-topo]
(feed feeder [["hello the man said"] ["the"]])
(is (= [[2]] (exec-drpc drpc "words" "the")))
(is (= [[1]] (exec-drpc drpc "words" "hello")))
@@ -135,13 +138,13 @@
(.aggregate (CountAsAggregator.) (fields "count"))
(.parallelismHint 2) ;;this makes sure batchGlobal is working correctly
(.project (fields "count")))
- (with-topology [cluster topo]
+ (with-topology [cluster topo storm-topo]
(doseq [i (range 100)]
(is (= [[1]] (exec-drpc drpc "numwords" "the"))))
(is (= [[0]] (exec-drpc drpc "numwords" "")))
(is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8")))
)))))
-
+
(deftest test-split-merge
(t/with-local-cluster [cluster]
(with-drpc [drpc]
@@ -158,7 +161,7 @@
(.project (fields "len"))))
(.merge topo [s1 s2])
- (with-topology [cluster topo]
+ (with-topology [cluster topo storm-topo]
(is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
(is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
)))))
@@ -180,11 +183,11 @@
(.aggregate (CountAsAggregator.) (fields "count"))))
(.merge topo [s1 s2])
- (with-topology [cluster topo]
+ (with-topology [cluster topo storm-topo]
(is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
(is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
)))))
-
+
(deftest test-multi-repartition
(t/with-local-cluster [cluster]
(with-drpc [drpc]
@@ -196,7 +199,7 @@
(.shuffle)
(.aggregate (CountAsAggregator.) (fields "count"))
))
- (with-topology [cluster topo]
+ (with-topology [cluster topo storm-topo]
(is (t/ms= [[2]] (exec-drpc drpc "tester" "the man")))
(is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
)))))
@@ -270,6 +273,82 @@
(.stateQuery word-counts (fields "word1") (MapGet.) (fields "count"))))))
)))
+
+(deftest test-set-component-resources
+ (t/with-local-cluster [cluster]
+ (with-drpc [drpc]
+ (letlocals
+ (bind topo (TridentTopology.))
+ (bind feeder (feeder-spout ["sentence"]))
+ (bind add-bang (proxy [BaseFunction] []
+ (execute [tuple collector]
+ (. collector emit (str (. tuple getString 0) "!")))))
+ (bind word-counts
+ (.. topo
+ (newStream "words" feeder)
+ (parallelismHint 5)
+ (setCPULoad 20)
+ (setMemoryLoad 512 256)
+ (each (fields "sentence") (Split.) (fields "word"))
+ (setCPULoad 10)
+ (setMemoryLoad 512)
+ (each (fields "word") add-bang (fields "word!"))
+ (parallelismHint 10)
+ (setCPULoad 50)
+ (setMemoryLoad 1024)
+ (groupBy (fields "word!"))
+ (persistentAggregate (memory-map-state) (Count.) (fields "count"))
+ (setCPULoad 100)
+ (setMemoryLoad 2048)))
+ (with-topology [cluster topo storm-topo]
+
+ (let [parse-fn (fn [[k v]]
+ [k (clojurify-structure (. (JSONParser.) parse (.. v get_common get_json_conf)))])
+ json-confs (into {} (map parse-fn (. storm-topo get_bolts)))]
+ (testing "spout memory"
+ (is (= (-> (json-confs "spout-words")
+ (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+ 512.0))
+
+ (is (= (-> (json-confs "spout-words")
+ (get TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB))
+ 256.0))
+
+ (is (= (-> (json-confs "$spoutcoord-spout-words")
+ (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+ 512.0))
+
+ (is (= (-> (json-confs "$spoutcoord-spout-words")
+ (get TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB))
+ 256.0)))
+
+ (testing "spout CPU"
+ (is (= (-> (json-confs "spout-words")
+ (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+ 20.0))
+
+ (is (= (-> (json-confs "$spoutcoord-spout-words")
+ (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+ 20.0)))
+
+ (testing "bolt combinations"
+ (is (= (-> (json-confs "b-1")
+ (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+ (+ 1024.0 512.0)))
+
+ (is (= (-> (json-confs "b-1")
+ (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+ 60.0)))
+
+ (testing "aggregations after partition"
+ (is (= (-> (json-confs "b-0")
+ (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+ 2048.0))
+
+ (is (= (-> (json-confs "b-0")
+ (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+ 100.0)))))))))
+
;; (deftest test-split-merge
;; (t/with-local-cluster [cluster]
;; (with-drpc [drpc]
@@ -284,7 +363,7 @@
;; (-> drpc-stream
;; (.each (fields "args") (StringLength.) (fields "len"))
;; (.project (fields "len"))))
-;;
+;;
;; (.merge topo [s1 s2])
;; (with-topology [cluster topo]
;; (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
[2/2] storm git commit: Adding STORM-1616 to CHANGELOG.md
Posted by kn...@apache.org.
Adding STORM-1616 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/885aaec4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/885aaec4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/885aaec4
Branch: refs/heads/1.x-branch
Commit: 885aaec4349c78016fbe6e3f8c8db3f7b8426704
Parents: d0d53be
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Thu Mar 17 15:51:09 2016 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Mar 17 15:51:09 2016 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/885aaec4/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bb139d3..861cafd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.0.0
+ * STORM-1616: Add RAS API for Trident
* STORM-1483: add storm-mongodb connector
* STORM-1614: backpressure init and cleanup changes
* STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector