You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2016/03/17 21:51:21 UTC

[1/2] storm git commit: Initial changes.

Repository: storm
Updated Branches:
  refs/heads/1.x-branch fc64e158f -> 885aaec43


Initial changes.

Signed-off-by: Kyle Nusbaum <Ky...@gmail.com>

Ready for PR

Signed-off-by: Kyle Nusbaum <Ky...@gmail.com>

Addressing comments.

Signed-off-by: Kyle Nusbaum <Ky...@gmail.com>

Addressing Comments.

Signed-off-by: Kyle Nusbaum <Ky...@gmail.com>

adding code documentation explaining math of combining component resources.

Signed-off-by: Kyle Nusbaum <Ky...@gmail.com>

Addressing comments and adding a bit more documentation.

Signed-off-by: Kyle Nusbaum <Ky...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d0d53be9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d0d53be9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d0d53be9

Branch: refs/heads/1.x-branch
Commit: d0d53be94a967a07ed767ec1c79738649dd70722
Parents: fc64e15
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Tue Mar 8 15:46:23 2016 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Mar 17 15:49:53 2016 -0500

----------------------------------------------------------------------
 .../clj/org/apache/storm/trident/testing.clj    |  12 +-
 .../ComponentConfigurationDeclarer.java         |   5 +-
 .../apache/storm/topology/ResourceDeclarer.java |  28 +++++
 .../jvm/org/apache/storm/trident/Stream.java    |  31 +++++-
 .../org/apache/storm/trident/TridentState.java  |  27 ++++-
 .../apache/storm/trident/TridentTopology.java   |  91 ++++++++++++++-
 .../org/apache/storm/trident/graph/Group.java   |  22 +++-
 .../operation/DefaultResourceDeclarer.java      |  66 +++++++++++
 .../trident/operation/ITridentResource.java     |  32 ++++++
 .../org/apache/storm/trident/planner/Node.java  |   5 +-
 .../apache/storm/trident/integration_test.clj   | 111 ++++++++++++++++---
 11 files changed, 390 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/clj/org/apache/storm/trident/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/trident/testing.clj b/storm-core/src/clj/org/apache/storm/trident/testing.clj
index 44e5ca9..aafb001 100644
--- a/storm-core/src/clj/org/apache/storm/trident/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/trident/testing.clj
@@ -55,14 +55,14 @@
      (.shutdown ~drpc)
      ))
 
-(defn with-topology* [cluster topo body-fn]
-  (t/submit-local-topology (:nimbus cluster) "tester" {} (.build topo))
+(defn with-topology* [cluster storm-topo body-fn]
+  (t/submit-local-topology (:nimbus cluster) "tester" {} storm-topo)
   (body-fn)
-  (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0)))
-  )
+  (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0))))
 
-(defmacro with-topology [[cluster topo] & body]
-  `(with-topology* ~cluster ~topo (fn [] ~@body)))
+(defmacro with-topology [[cluster topo storm-topo] & body]
+  `(let [~storm-topo (.build ~topo)]
+     (with-topology* ~cluster ~storm-topo (fn [] ~@body))))
 
 (defn bootstrap-imports []
   (import 'org.apache.storm.LocalDRPC)

http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
index 328af55..5dc7264 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
@@ -19,14 +19,11 @@ package org.apache.storm.topology;
 
 import java.util.Map;
 
-public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> {
+public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> extends ResourceDeclarer<T> {
     T addConfigurations(Map<String, Object> conf);
     T addConfiguration(String config, Object value);
     T setDebug(boolean debug);
     T setMaxTaskParallelism(Number val);
     T setMaxSpoutPending(Number val);
     T setNumTasks(Number val);
-    T setMemoryLoad(Number onHeap);
-    T setMemoryLoad(Number onHeap, Number offHeap);
-    T setCPULoad(Number amount);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
new file mode 100644
index 0000000..4f648eb
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+/**
+ * This is a new base interface that can be used by anything that wants to mirror
+ * RAS's basic API. Trident uses this to allow setting resources in the Stream API.
+ */
+public interface ResourceDeclarer <T extends ResourceDeclarer> {
+    T setMemoryLoad(Number onHeap);
+    T setMemoryLoad(Number onHeap, Number offHeap);
+    T setCPULoad(Number amount);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index d313678..4a51b56 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -20,6 +20,7 @@ package org.apache.storm.trident;
 import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.NullStruct;
 import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.topology.ResourceDeclarer;
 import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
 import org.apache.storm.trident.fluent.GlobalAggregationScheme;
 import org.apache.storm.trident.fluent.GroupedStream;
@@ -90,7 +91,7 @@ import java.util.Comparator;
  *
  */
 // TODO: need to be able to replace existing fields with the function fields (like Cascading Fields.REPLACE)
-public class Stream implements IAggregatableStream {
+public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     Node _node;
     TridentTopology _topology;
     String _name;
@@ -124,6 +125,34 @@ public class Stream implements IAggregatableStream {
     }
 
     /**
+     * Sets the CPU Load resource for the current operation
+     */
+    @Override
+    public Stream setCPULoad(Number load) {
+        _node.setCPULoad(load);
+        return this;
+    }
+
+    /**
+     * Sets the Memory Load resources for the current operation.
+     * offHeap becomes default
+     */
+    @Override
+    public Stream setMemoryLoad(Number onHeap) {
+        _node.setMemoryLoad(onHeap);
+        return this;
+    }
+
+    /**
+     * Sets the Memory Load resources for the current operation.
+     */
+    @Override
+    public Stream setMemoryLoad(Number onHeap, Number offHeap) {
+        _node.setMemoryLoad(onHeap, offHeap);
+        return this;
+    }
+
+    /**
      * Filters out fields from a stream, resulting in a Stream containing only the fields specified by `keepFields`.
      *
      * For example, if you had a Stream `mystream` containing the fields `["a", "b", "c","d"]`, calling"

http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
index 7173254..18b60e0 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
@@ -17,24 +17,43 @@
  */
 package org.apache.storm.trident;
 
+import org.apache.storm.topology.ResourceDeclarer;
 import org.apache.storm.trident.planner.Node;
 
 
-public class TridentState {
+public class TridentState implements ResourceDeclarer<TridentState> {
     TridentTopology _topology;
     Node _node;
-    
+
     protected TridentState(TridentTopology topology, Node node) {
         _topology = topology;
         _node = node;
     }
-    
+
     public Stream newValuesStream() {
         return new Stream(_topology, _node.name, _node);
     }
-    
+
     public TridentState parallelismHint(int parallelism) {
         _node.parallelismHint = parallelism;
         return this;
     }
+
+    @Override
+    public TridentState setCPULoad(Number load) {
+        _node.setCPULoad(load);
+        return this;
+    }
+
+    @Override
+    public TridentState setMemoryLoad(Number onHeap) {
+        _node.setMemoryLoad(onHeap);
+        return this;
+    }
+
+    @Override
+    public TridentState setMemoryLoad(Number onHeap, Number offHeap) {
+        _node.setMemoryLoad(onHeap, offHeap);
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index eb50a10..e0a349b 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -44,6 +44,7 @@ import org.apache.storm.trident.fluent.UniqueIdGen;
 import org.apache.storm.trident.graph.GraphGrouper;
 import org.apache.storm.trident.graph.Group;
 import org.apache.storm.trident.operation.GroupedMultiReducer;
+import org.apache.storm.trident.operation.ITridentResource;
 import org.apache.storm.trident.operation.MultiReducer;
 import org.apache.storm.trident.operation.impl.FilterExecutor;
 import org.apache.storm.trident.operation.impl.GroupedMultiReducerExecutor;
@@ -394,11 +395,23 @@ public class TridentTopology {
         Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
         Map<Group, String> boltIds = genBoltIds(mergedGroups);
         
+        Map defaults = Utils.readDefaultConfig();
+
         for(SpoutNode sn: spoutNodes) {
             Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
+
+            Map<String, Number> spoutRes = null;
+            spoutRes = mergeDefaultResources(sn.getResources(), defaults);
+            Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+            Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+            Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
             if(sn.type == SpoutNode.SpoutType.DRPC) {
+
                 builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
-                        (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
+                                              (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn))
+                    .setMemoryLoad(onHeap, offHeap)
+                    .setCPULoad(cpuLoad);
             } else {
                 ITridentSpout s;
                 if(sn.spout instanceof IBatchSpout) {
@@ -409,16 +422,26 @@ public class TridentTopology {
                     throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
                     // TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
                 }
-                builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
+                builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn))
+                    .setMemoryLoad(onHeap, offHeap)
+                    .setCPULoad(cpuLoad);
             }
         }
-        
+
         for(Group g: mergedGroups) {
             if(!isSpoutGroup(g)) {
                 Integer p = parallelisms.get(g);
                 Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
+                Map<String, Number> groupRes = mergeDefaultResources(g.getResources(), defaults);
+
+                Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+                Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+                Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
                 BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
-                        committerBatches(g, batchGroupMap), streamToGroup);
+                                                 committerBatches(g, batchGroupMap), streamToGroup)
+                    .setMemoryLoad(onHeap, offHeap)
+                    .setCPULoad(cpuLoad);
                 Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
                 for(PartitionNode n: inputs) {
                     Node parent = TridentUtils.getParent(graph, n);
@@ -431,6 +454,64 @@ public class TridentTopology {
         
         return builder.buildTopology();
     }
+
+    private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map defaultConfig) {
+        Map<String, Number> ret = new HashMap<String, Number>();
+
+        Number onHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        Number offHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+        Number cpuLoadDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
+        if(res == null) {
+            ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeapDefault);
+            ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeapDefault);
+            ret.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpuLoadDefault);
+            return ret;
+        }
+
+        Number onHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        Number offHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+        Number cpuLoad = res.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
+        /* We take the max of the default and whatever the user put in here.
+           Each node's resources can be the sum of several operations, so the simplest
+           thing to do is get the max.
+
+           The situation we want to avoid is that the user sets low resources on one
+           node, and when that node is combined with a bunch of others, the sum is still
+           that low resource count. If any component isn't set, we want to use the default.
+
+           Right now, this code does not check that. It just takes the max of the summed
+           up resource counts for simplicity's sake. We could perform some more complicated
+           logic to be more accurate, but the benefits are very small, and only apply to some
+           very odd corner cases. */
+        if(onHeap == null) {
+            onHeap = onHeapDefault;
+        }
+        else {
+            onHeap = Math.max(onHeap.doubleValue(), onHeapDefault.doubleValue());
+        }
+
+        if(offHeap == null) {
+            offHeap = offHeapDefault;
+        }
+        else {
+            offHeap = Math.max(offHeap.doubleValue(), offHeapDefault.doubleValue());
+        }
+
+        if(cpuLoad == null) {
+            cpuLoad = cpuLoadDefault;
+        }
+        else {
+            cpuLoad = Math.max(cpuLoad.doubleValue(), cpuLoadDefault.doubleValue());
+        }
+
+        ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+        ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
+        ret.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpuLoad);
+
+        return ret;
+    }
     
     private static void completeDRPC(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
         List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
@@ -464,7 +545,7 @@ public class TridentTopology {
         }
         return ret;
     }
-    
+
     //returns null if it's not a drpc group
     private static SpoutNode getDRPCSpoutNode(Collection<Node> g) {
         for(Node n: g) {

http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
index ef1399b..2c92304 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
@@ -18,17 +18,20 @@
 package org.apache.storm.trident.graph;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.jgrapht.DirectedGraph;
+import org.apache.storm.trident.operation.ITridentResource;
 import org.apache.storm.trident.planner.Node;
 import org.apache.storm.trident.util.IndexedEdge;
 import org.apache.storm.trident.util.TridentUtils;
 
 
-public class Group {
+public class Group implements ITridentResource {
     public final Set<Node> nodes = new HashSet<>();
     private final DirectedGraph<Node, IndexedEdge> graph;
     private final String id = UUID.randomUUID().toString();
@@ -65,6 +68,23 @@ public class Group {
     }
 
     @Override
+    public Map<String, Number> getResources() {
+        Map<String, Number> ret = new HashMap<>();
+        for(Node n: nodes) {
+            Map<String, Number> res = n.getResources();
+            for(Map.Entry<String, Number> kv : res.entrySet()) {
+                String key = kv.getKey();
+                Number val = kv.getValue();
+                if(ret.containsKey(key)) {
+                    val = new Double(val.doubleValue() + ret.get(key).doubleValue());
+                }
+                ret.put(key, val);
+            }
+        }
+        return ret;
+    }
+
+    @Override
     public int hashCode() {
         return id.hashCode();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
new file mode 100644
index 0000000..d49011a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.topology.ResourceDeclarer;
+
+/**
+ * @param T Must always be the type of the extending class. i.e.
+ * public class SubResourceDeclarer extends DefaultResourceDeclarer<SubResourceDeclarer> {...}
+ */
+public class DefaultResourceDeclarer<T extends DefaultResourceDeclarer> implements ResourceDeclarer<T>, ITridentResource {
+
+    private Map<String, Number> resources = new HashMap<>();
+    private Map conf = Utils.readStormConfig();
+
+    @Override
+    public T setMemoryLoad(Number onHeap) {
+        return setMemoryLoad(onHeap, Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)));
+    }
+
+    @Override
+    public T setMemoryLoad(Number onHeap, Number offHeap) {
+        if (onHeap != null) {
+            onHeap = onHeap.doubleValue();
+            resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+        }
+        if (offHeap!=null) {
+            offHeap = offHeap.doubleValue();
+            resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
+        }
+        return (T)this;
+    }
+
+    @Override
+    public T setCPULoad(Number amount) {
+        if(amount != null) {
+            amount = amount.doubleValue();
+            resources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount);
+        }
+        return (T)this;
+    }
+
+    @Override
+    public Map<String, Number> getResources() {
+        return new HashMap<String, Number>(resources);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
new file mode 100644
index 0000000..b3e10ef
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import java.util.Map;
+
+/**
+ * This interface is implemented by various Trident classes in order to
+ * gather and propogate resources that have been set on them.
+ * @see ResourceDeclarer
+ */
+public interface ITridentResource {
+    /**
+     * @return a name of resource name -> amount of that resource. *Return should never be null!*
+     */
+    Map<String, Number> getResources();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
index 64d8a3b..b2466e6 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.trident.planner;
 
+import org.apache.storm.trident.operation.DefaultResourceDeclarer;
 import org.apache.storm.tuple.Fields;
 import java.io.Serializable;
 import java.util.UUID;
@@ -25,7 +26,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 
 
-public class Node implements Serializable {
+public class Node extends DefaultResourceDeclarer<Node> implements Serializable {
     private static final AtomicInteger INDEX = new AtomicInteger(0);
     
     private String nodeId;
@@ -62,6 +63,4 @@ public class Node implements Serializable {
     public String toString() {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
     }
-    
-    
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d0d53be9/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
index 4c52286..7f81c4b 100644
--- a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
@@ -19,10 +19,13 @@
   (:import [org.apache.storm.trident.testing Split CountAsAggregator StringLength TrueFilter
             MemoryMapState$Factory])
   (:import [org.apache.storm.trident.state StateSpec])
-  (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater])
-  (:use [org.apache.storm.trident testing])
-  (:use [org.apache.storm util]))
-  
+  (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater]
+           [org.apache.storm.trident.operation BaseFunction]
+           [org.json.simple.parser JSONParser]
+           [org.apache.storm Config])
+  (:use [org.apache.storm.trident testing]
+        [org.apache.storm log util config]))
+
 (bootstrap-imports)
 
 (deftest test-memory-map-get-tuples
@@ -38,13 +41,13 @@
               (.groupBy (fields "word"))
               (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
               (.parallelismHint 6)
-              ))       
+              ))
         (-> topo
             (.newDRPCStream "all-tuples" drpc)
             (.broadcast)
             (.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count"))
             (.project (fields "word" "count")))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (feed feeder [["hello the man said"] ["the"]])
           (is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]}
                  (into #{} (exec-drpc drpc "all-tuples" "man"))))
@@ -73,7 +76,7 @@
             (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
             (.aggregate (fields "count") (Sum.) (fields "sum"))
             (.project (fields "sum")))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (feed feeder [["hello the man said"] ["the"]])
           (is (= [[2]] (exec-drpc drpc "words" "the")))
           (is (= [[1]] (exec-drpc drpc "words" "hello")))
@@ -83,7 +86,7 @@
           (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
           )))))
 
-;; this test reproduces a bug where committer spouts freeze processing when 
+;; this test reproduces a bug where committer spouts freeze processing when
 ;; there's at least one repartitioning after the spout
 (deftest test-word-count-committer-spout
   (t/with-local-cluster [cluster]
@@ -108,7 +111,7 @@
             (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
             (.aggregate (fields "count") (Sum.) (fields "sum"))
             (.project (fields "sum")))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (feed feeder [["hello the man said"] ["the"]])
           (is (= [[2]] (exec-drpc drpc "words" "the")))
           (is (= [[1]] (exec-drpc drpc "words" "hello")))
@@ -135,13 +138,13 @@
             (.aggregate (CountAsAggregator.) (fields "count"))
             (.parallelismHint 2) ;;this makes sure batchGlobal is working correctly
             (.project (fields "count")))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (doseq [i (range 100)]
             (is (= [[1]] (exec-drpc drpc "numwords" "the"))))
           (is (= [[0]] (exec-drpc drpc "numwords" "")))
           (is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8")))
           )))))
-          
+
 (deftest test-split-merge
   (t/with-local-cluster [cluster]
     (with-drpc [drpc]
@@ -158,7 +161,7 @@
               (.project (fields "len"))))
 
         (.merge topo [s1 s2])
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
           (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
           )))))
@@ -180,11 +183,11 @@
               (.aggregate (CountAsAggregator.) (fields "count"))))
 
         (.merge topo [s1 s2])
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
           (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
           )))))
-          
+
 (deftest test-multi-repartition
   (t/with-local-cluster [cluster]
     (with-drpc [drpc]
@@ -196,7 +199,7 @@
                                    (.shuffle)
                                    (.aggregate (CountAsAggregator.) (fields "count"))
                                    ))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man")))
           (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
           )))))
@@ -270,6 +273,82 @@
                         (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count"))))))
      )))
 
+
+(deftest test-set-component-resources
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind feeder (feeder-spout ["sentence"]))
+        (bind add-bang (proxy [BaseFunction] []
+                         (execute [tuple collector]
+                           (. collector emit (str (. tuple getString 0) "!")))))
+        (bind word-counts
+          (.. topo
+              (newStream "words" feeder)
+              (parallelismHint 5)
+              (setCPULoad 20)
+              (setMemoryLoad 512 256)
+              (each (fields "sentence") (Split.) (fields "word"))
+              (setCPULoad 10)
+              (setMemoryLoad 512)
+              (each (fields "word") add-bang (fields "word!"))
+              (parallelismHint 10)
+              (setCPULoad 50)
+              (setMemoryLoad 1024)
+              (groupBy (fields "word!"))
+              (persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (setCPULoad 100)
+              (setMemoryLoad 2048)))
+        (with-topology [cluster topo storm-topo]
+
+          (let [parse-fn (fn [[k v]]
+                           [k (clojurify-structure (. (JSONParser.) parse (.. v get_common get_json_conf)))])
+                json-confs (into {} (map parse-fn (. storm-topo get_bolts)))]
+            (testing "spout memory"
+              (is (= (-> (json-confs "spout-words")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+                     512.0))
+
+              (is (= (-> (json-confs "spout-words")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB))
+                   256.0))
+
+              (is (= (-> (json-confs "$spoutcoord-spout-words")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+                     512.0))
+
+              (is (= (-> (json-confs "$spoutcoord-spout-words")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB))
+                     256.0)))
+
+            (testing "spout CPU"
+              (is (= (-> (json-confs "spout-words")
+                         (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+                     20.0))
+
+              (is (= (-> (json-confs "$spoutcoord-spout-words")
+                       (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+                     20.0)))
+
+            (testing "bolt combinations"
+              (is (= (-> (json-confs "b-1")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+                     (+ 1024.0 512.0)))
+
+              (is (= (-> (json-confs "b-1")
+                         (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+                     60.0)))
+
+            (testing "aggregations after partition"
+              (is (= (-> (json-confs "b-0")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+                     2048.0))
+
+              (is (= (-> (json-confs "b-0")
+                         (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+                     100.0)))))))))
+
 ;; (deftest test-split-merge
 ;;   (t/with-local-cluster [cluster]
 ;;     (with-drpc [drpc]
@@ -284,7 +363,7 @@
 ;;           (-> drpc-stream
 ;;               (.each (fields "args") (StringLength.) (fields "len"))
 ;;               (.project (fields "len"))))
-;; 
+;;
 ;;         (.merge topo [s1 s2])
 ;;         (with-topology [cluster topo]
 ;;           (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))


[2/2] storm git commit: Adding STORM-1616 to CHANGELOG.md

Posted by kn...@apache.org.
Adding STORM-1616 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/885aaec4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/885aaec4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/885aaec4

Branch: refs/heads/1.x-branch
Commit: 885aaec4349c78016fbe6e3f8c8db3f7b8426704
Parents: d0d53be
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Thu Mar 17 15:51:09 2016 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Mar 17 15:51:09 2016 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/885aaec4/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bb139d3..861cafd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.0.0
+ * STORM-1616: Add RAS API for Trident
  * STORM-1483: add storm-mongodb connector
  * STORM-1614: backpressure init and cleanup changes
  * STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector