You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2016/03/17 21:04:29 UTC

[1/8] storm git commit: Initial changes.

Repository: storm
Updated Branches:
  refs/heads/master 50701df4a -> 367464a3d


Initial changes.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/17a55d20
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/17a55d20
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/17a55d20

Branch: refs/heads/master
Commit: 17a55d20d3f20bc04ca48ee3a9f63eaed8960b9c
Parents: b477939
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Tue Mar 8 15:46:23 2016 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Tue Mar 8 15:46:23 2016 -0600

----------------------------------------------------------------------
 .../ComponentConfigurationDeclarer.java         |  5 +-
 .../apache/storm/topology/ResourceDeclarer.java | 24 ++++++
 .../apache/storm/trident/TridentTopology.java   | 84 ++++++++++++++++++--
 .../org/apache/storm/trident/graph/Group.java   | 24 +++++-
 .../operation/DefaultResourceDeclarer.java      | 62 +++++++++++++++
 .../trident/operation/ITridentResource.java     | 24 ++++++
 6 files changed, 213 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
index 328af55..5dc7264 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/ComponentConfigurationDeclarer.java
@@ -19,14 +19,11 @@ package org.apache.storm.topology;
 
 import java.util.Map;
 
-public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> {
+public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> extends ResourceDeclarer<T> {
     T addConfigurations(Map<String, Object> conf);
     T addConfiguration(String config, Object value);
     T setDebug(boolean debug);
     T setMaxTaskParallelism(Number val);
     T setMaxSpoutPending(Number val);
     T setNumTasks(Number val);
-    T setMemoryLoad(Number onHeap);
-    T setMemoryLoad(Number onHeap, Number offHeap);
-    T setCPULoad(Number amount);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
new file mode 100644
index 0000000..de530b3
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+public interface ResourceDeclarer <T extends ResourceDeclarer> {
+    T setMemoryLoad(Number onHeap);
+    T setMemoryLoad(Number onHeap, Number offHeap);
+    T setCPULoad(Number amount);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index eb50a10..3836663 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -44,6 +44,7 @@ import org.apache.storm.trident.fluent.UniqueIdGen;
 import org.apache.storm.trident.graph.GraphGrouper;
 import org.apache.storm.trident.graph.Group;
 import org.apache.storm.trident.operation.GroupedMultiReducer;
+import org.apache.storm.trident.operation.ITridentResource;
 import org.apache.storm.trident.operation.MultiReducer;
 import org.apache.storm.trident.operation.impl.FilterExecutor;
 import org.apache.storm.trident.operation.impl.GroupedMultiReducerExecutor;
@@ -394,11 +395,28 @@ public class TridentTopology {
         Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
         Map<Group, String> boltIds = genBoltIds(mergedGroups);
         
+        Map defaults = Utils.readDefaultConfig();
+
         for(SpoutNode sn: spoutNodes) {
             Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
+
+            Map<String, Number> spoutRes = null;
+            if(sn instanceof ITridentResource) {
+                spoutRes = mergeDefaultResources(((ITridentResource)sn).getResources(), defaults);
+            }
+            else {
+                spoutRes = mergeDefaultResources(null, defaults);
+            }
+            Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+            Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+            Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
             if(sn.type == SpoutNode.SpoutType.DRPC) {
+
                 builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
-                        (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
+                                              (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn))
+                    .setMemoryLoad(onHeap, offHeap)
+                    .setCPULoad(cpuLoad);
             } else {
                 ITridentSpout s;
                 if(sn.spout instanceof IBatchSpout) {
@@ -409,16 +427,26 @@ public class TridentTopology {
                     throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
                     // TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
                 }
-                builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
+                builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn))
+                    .setMemoryLoad(onHeap, offHeap)
+                    .setCPULoad(cpuLoad);
             }
         }
-        
+
         for(Group g: mergedGroups) {
             if(!isSpoutGroup(g)) {
                 Integer p = parallelisms.get(g);
                 Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
+                Map<String, Number> groupRes = mergeDefaultResources(g.getResources(), defaults);
+
+                Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+                Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+                Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
                 BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
-                        committerBatches(g, batchGroupMap), streamToGroup);
+                                                 committerBatches(g, batchGroupMap), streamToGroup)
+                    .setMemoryLoad(onHeap, offHeap)
+                    .setCPULoad(cpuLoad);
                 Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
                 for(PartitionNode n: inputs) {
                     Node parent = TridentUtils.getParent(graph, n);
@@ -431,6 +459,52 @@ public class TridentTopology {
         
         return builder.buildTopology();
     }
+
+    private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map defaultConfig) {
+        Map<String, Number> ret = new HashMap<String, Number>();
+
+        Number  onHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        Number offHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+        Number cpuLoadDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
+        if(res == null) {
+            ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeapDefault);
+            ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeapDefault);
+            ret.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpuLoadDefault);
+            return ret;
+        }
+
+        Number  onHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        Number offHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+        Number cpuLoad = res.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
+        if(onHeap == null) {
+            onHeap = onHeapDefault;
+        }
+        else {
+            onHeap = Math.max(onHeap.doubleValue(), onHeapDefault.doubleValue());
+        }
+
+        if(offHeap == null) {
+            offHeap = offHeapDefault;
+        }
+        else {
+            offHeap = Math.max(offHeap.doubleValue(), offHeapDefault.doubleValue());
+        }
+
+        if(cpuLoad == null) {
+            cpuLoad = cpuLoadDefault;
+        }
+        else {
+            cpuLoad = Math.max(cpuLoad.doubleValue(), cpuLoadDefault.doubleValue());
+        }
+
+        ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+        ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
+        ret.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpuLoad);
+
+        return ret;
+    }
     
     private static void completeDRPC(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
         List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
@@ -464,7 +538,7 @@ public class TridentTopology {
         }
         return ret;
     }
-    
+
     //returns null if it's not a drpc group
     private static SpoutNode getDRPCSpoutNode(Collection<Node> g) {
         for(Node n: g) {

http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
index ef1399b..a61e3f5 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
@@ -18,17 +18,20 @@
 package org.apache.storm.trident.graph;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.jgrapht.DirectedGraph;
+import org.apache.storm.trident.operation.ITridentResource;
 import org.apache.storm.trident.planner.Node;
 import org.apache.storm.trident.util.IndexedEdge;
 import org.apache.storm.trident.util.TridentUtils;
 
 
-public class Group {
+public class Group implements ITridentResource {
     public final Set<Node> nodes = new HashSet<>();
     private final DirectedGraph<Node, IndexedEdge> graph;
     private final String id = UUID.randomUUID().toString();
@@ -65,6 +68,25 @@ public class Group {
     }
 
     @Override
+    public Map<String, Number> getResources() {
+        Map<String, Number> ret = new HashMap<>();
+        for(Node n: nodes) {
+            if(n instanceof ITridentResource) {
+                Map<String, Number> res = ((ITridentResource)n).getResources();
+                for(Map.Entry<String, Number> kv : res.entrySet()) {
+                    String key = kv.getKey();
+                    Number val = kv.getValue();
+                    if(ret.containsKey(key)) {
+                        val = new Double(val.doubleValue() + ret.get(key).doubleValue());
+                    }
+                    ret.put(key, val);
+                }
+            }
+        }
+        return ret;
+    }
+
+    @Override
     public int hashCode() {
         return id.hashCode();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
new file mode 100644
index 0000000..72ca27e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.topology.ResourceDeclarer;
+
+public class DefaultResourceDeclarer implements ResourceDeclarer, ITridentResource {
+
+    private Map<String, Number> resources = new HashMap<>();
+    private Map conf = Utils.readStormConfig();
+
+    @Override
+    public DefaultResourceDeclarer setMemoryLoad(Number onHeap) {
+        return setMemoryLoad(onHeap, Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)));
+    }
+
+    @Override
+    public DefaultResourceDeclarer setMemoryLoad(Number onHeap, Number offHeap) {
+        if (onHeap != null) {
+            onHeap = onHeap.doubleValue();
+            resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+        }
+        if (offHeap!=null) {
+            offHeap = offHeap.doubleValue();
+            resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
+        }
+        return this;
+    }
+
+    @Override
+    public DefaultResourceDeclarer setCPULoad(Number amount) {
+        if(amount != null) {
+            amount = amount.doubleValue();
+            resources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount);
+        }
+        return this;
+    }
+
+    @Override
+    public Map<String, Number> getResources() {
+        return new HashMap<String, Number>(resources);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/17a55d20/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
new file mode 100644
index 0000000..4b8a047
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.trident.operation;
+
+import java.util.Map;
+
+public interface ITridentResource {
+    Map<String, Number> getResources();
+}


[7/8] storm git commit: Merge branch 'Trident-RAS-v2' of https://github.com/knusbaum/incubator-storm

Posted by kn...@apache.org.
Merge branch 'Trident-RAS-v2' of https://github.com/knusbaum/incubator-storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fb880cae
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fb880cae
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fb880cae

Branch: refs/heads/master
Commit: fb880cae02415889708bb38138795a886164db2f
Parents: 50701df 251bc56
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Thu Mar 17 15:03:07 2016 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Mar 17 15:03:07 2016 -0500

----------------------------------------------------------------------
 .../clj/org/apache/storm/trident/testing.clj    |  12 +-
 .../ComponentConfigurationDeclarer.java         |   5 +-
 .../apache/storm/topology/ResourceDeclarer.java |  28 +++++
 .../jvm/org/apache/storm/trident/Stream.java    |  31 +++++-
 .../org/apache/storm/trident/TridentState.java  |  27 ++++-
 .../apache/storm/trident/TridentTopology.java   |  91 ++++++++++++++-
 .../org/apache/storm/trident/graph/Group.java   |  22 +++-
 .../operation/DefaultResourceDeclarer.java      |  66 +++++++++++
 .../trident/operation/ITridentResource.java     |  32 ++++++
 .../org/apache/storm/trident/planner/Node.java  |   5 +-
 .../apache/storm/trident/integration_test.clj   | 110 ++++++++++++++++---
 11 files changed, 390 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fb880cae/storm-core/src/clj/org/apache/storm/trident/testing.clj
----------------------------------------------------------------------


[6/8] storm git commit: Addressing comments and adding a bit more documentation.

Posted by kn...@apache.org.
Addressing comments and adding a bit more documentation.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/251bc569
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/251bc569
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/251bc569

Branch: refs/heads/master
Commit: 251bc5691eb83506ba5934724311883486a1c9d2
Parents: 5404023
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Thu Mar 17 13:28:19 2016 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Mar 17 13:28:19 2016 -0500

----------------------------------------------------------------------
 .../org/apache/storm/topology/ResourceDeclarer.java |  4 ++++
 .../org/apache/storm/trident/TridentTopology.java   |  2 +-
 .../jvm/org/apache/storm/trident/graph/Group.java   | 16 +++++++---------
 .../storm/trident/operation/ITridentResource.java   |  8 ++++++++
 4 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/251bc569/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
index de530b3..4f648eb 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/ResourceDeclarer.java
@@ -17,6 +17,10 @@
  */
 package org.apache.storm.topology;
 
+/**
+ * This is a new base interface that can be used by anything that wants to mirror
+ * RAS's basic API. Trident uses this to allow setting resources in the Stream API.
+ */
 public interface ResourceDeclarer <T extends ResourceDeclarer> {
     T setMemoryLoad(Number onHeap);
     T setMemoryLoad(Number onHeap, Number offHeap);

http://git-wip-us.apache.org/repos/asf/storm/blob/251bc569/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index 3aefdc5..e0a349b 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -484,7 +484,7 @@ public class TridentTopology {
            Right now, this code does not check that. It just takes the max of the summed
            up resource counts for simplicity's sake. We could perform some more complicated
            logic to be more accurate, but the benefits are very small, and only apply to some
-           very odd corner cases. */g
+           very odd corner cases. */
         if(onHeap == null) {
             onHeap = onHeapDefault;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/251bc569/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
index a61e3f5..2c92304 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
@@ -71,16 +71,14 @@ public class Group implements ITridentResource {
     public Map<String, Number> getResources() {
         Map<String, Number> ret = new HashMap<>();
         for(Node n: nodes) {
-            if(n instanceof ITridentResource) {
-                Map<String, Number> res = ((ITridentResource)n).getResources();
-                for(Map.Entry<String, Number> kv : res.entrySet()) {
-                    String key = kv.getKey();
-                    Number val = kv.getValue();
-                    if(ret.containsKey(key)) {
-                        val = new Double(val.doubleValue() + ret.get(key).doubleValue());
-                    }
-                    ret.put(key, val);
+            Map<String, Number> res = n.getResources();
+            for(Map.Entry<String, Number> kv : res.entrySet()) {
+                String key = kv.getKey();
+                Number val = kv.getValue();
+                if(ret.containsKey(key)) {
+                    val = new Double(val.doubleValue() + ret.get(key).doubleValue());
                 }
+                ret.put(key, val);
             }
         }
         return ret;

http://git-wip-us.apache.org/repos/asf/storm/blob/251bc569/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
index 4b8a047..b3e10ef 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/ITridentResource.java
@@ -19,6 +19,14 @@ package org.apache.storm.trident.operation;
 
 import java.util.Map;
 
+/**
+ * This interface is implemented by various Trident classes in order to
+ * gather and propogate resources that have been set on them.
+ * @see ResourceDeclarer
+ */
 public interface ITridentResource {
+    /**
+     * @return a name of resource name -> amount of that resource. *Return should never be null!*
+     */
     Map<String, Number> getResources();
 }


[2/8] storm git commit: Ready for PR

Posted by kn...@apache.org.
Ready for PR


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fbfb1ca0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fbfb1ca0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fbfb1ca0

Branch: refs/heads/master
Commit: fbfb1ca0bb97ac2001d139eab56fef6917680340
Parents: 17a55d2
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Wed Mar 9 14:57:30 2016 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Wed Mar 9 14:57:30 2016 -0600

----------------------------------------------------------------------
 .../clj/org/apache/storm/trident/testing.clj    |  12 +-
 .../jvm/org/apache/storm/trident/Stream.java    |  25 ++++
 .../org/apache/storm/trident/TridentState.java  |  21 +++-
 .../org/apache/storm/trident/planner/Node.java  |   5 +-
 .../apache/storm/trident/integration_test.clj   | 114 ++++++++++++++++---
 5 files changed, 150 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/src/clj/org/apache/storm/trident/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/trident/testing.clj b/storm-core/src/clj/org/apache/storm/trident/testing.clj
index 0ec5613..9ddd94b 100644
--- a/storm-core/src/clj/org/apache/storm/trident/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/trident/testing.clj
@@ -56,14 +56,14 @@
      (.shutdown ~drpc)
      ))
 
-(defn with-topology* [cluster topo body-fn]
-  (t/submit-local-topology (:nimbus cluster) "tester" {} (.build topo))
+(defn with-topology* [cluster storm-topo body-fn]
+  (t/submit-local-topology (:nimbus cluster) "tester" {} storm-topo)
   (body-fn)
-  (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0)))
-  )
+  (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0))))
 
-(defmacro with-topology [[cluster topo] & body]
-  `(with-topology* ~cluster ~topo (fn [] ~@body)))
+(defmacro with-topology [[cluster topo storm-topo] & body]
+  `(let [~storm-topo (.build ~topo)]
+     (with-topology* ~cluster ~storm-topo (fn [] ~@body))))
 
 (defn bootstrap-imports []
   (import 'org.apache.storm.LocalDRPC)

http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index d313678..e13cb49 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -124,6 +124,31 @@ public class Stream implements IAggregatableStream {
     }
 
     /**
+     * Sets the CPU Load resource for the current node
+     */
+    public Stream setCPULoad(Number load) {
+        _node.setCPULoad(load);
+        return this;
+    }
+
+    /**
+     * Sets the Memory Load resources for the current node.
+     * offHeap becomes default
+     */
+    public Stream setMemoryLoad(Number onHeap) {
+        _node.setMemoryLoad(onHeap);
+        return this;
+    }
+
+    /**
+     * Sets the Memory Load resources for the current node
+     */
+    public Stream setMemoryLoad(Number onHeap, Number offHeap) {
+        _node.setMemoryLoad(onHeap, offHeap);
+        return this;
+    }
+
+    /**
      * Filters out fields from a stream, resulting in a Stream containing only the fields specified by `keepFields`.
      *
      * For example, if you had a Stream `mystream` containing the fields `["a", "b", "c","d"]`, calling"

http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
index 7173254..fafd5f9 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
@@ -23,18 +23,33 @@ import org.apache.storm.trident.planner.Node;
 public class TridentState {
     TridentTopology _topology;
     Node _node;
-    
+
     protected TridentState(TridentTopology topology, Node node) {
         _topology = topology;
         _node = node;
     }
-    
+
     public Stream newValuesStream() {
         return new Stream(_topology, _node.name, _node);
     }
-    
+
     public TridentState parallelismHint(int parallelism) {
         _node.parallelismHint = parallelism;
         return this;
     }
+
+    public TridentState setCPULoad(Number load) {
+        _node.setCPULoad(load);
+        return this;
+    }
+
+    public TridentState setMemoryLoad(Number onHeap) {
+        _node.setMemoryLoad(onHeap);
+        return this;
+    }
+
+    public TridentState setMemoryLoad(Number onHeap, Number offHeap) {
+        _node.setMemoryLoad(onHeap, offHeap);
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
index 64d8a3b..e39ec50 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.trident.planner;
 
+import org.apache.storm.trident.operation.DefaultResourceDeclarer;
 import org.apache.storm.tuple.Fields;
 import java.io.Serializable;
 import java.util.UUID;
@@ -25,7 +26,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 
 
-public class Node implements Serializable {
+public class Node extends DefaultResourceDeclarer implements Serializable {
     private static final AtomicInteger INDEX = new AtomicInteger(0);
     
     private String nodeId;
@@ -62,6 +63,4 @@ public class Node implements Serializable {
     public String toString() {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
     }
-    
-    
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fbfb1ca0/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
index 57edb70..14e6c5b 100644
--- a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
@@ -19,9 +19,13 @@
   (:import [org.apache.storm.trident.testing Split CountAsAggregator StringLength TrueFilter
             MemoryMapState$Factory])
   (:import [org.apache.storm.trident.state StateSpec])
-  (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater])
-  (:use [org.apache.storm.trident testing]))
-  
+  (:import [org.apache.storm.trident.operation.impl CombinerAggStateUpdater]
+           [org.apache.storm.trident.operation BaseFunction]
+           [org.json.simple.parser JSONParser]
+           [org.apache.storm Config])
+  (:use [org.apache.storm.trident testing]
+        [org.apache.storm log util config]))
+
 (bootstrap-imports)
 
 (defmacro letlocals
@@ -49,13 +53,13 @@
               (.groupBy (fields "word"))
               (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
               (.parallelismHint 6)
-              ))       
+              ))
         (-> topo
             (.newDRPCStream "all-tuples" drpc)
             (.broadcast)
             (.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count"))
             (.project (fields "word" "count")))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (feed feeder [["hello the man said"] ["the"]])
           (is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]}
                  (into #{} (exec-drpc drpc "all-tuples" "man"))))
@@ -84,7 +88,7 @@
             (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
             (.aggregate (fields "count") (Sum.) (fields "sum"))
             (.project (fields "sum")))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (feed feeder [["hello the man said"] ["the"]])
           (is (= [[2]] (exec-drpc drpc "words" "the")))
           (is (= [[1]] (exec-drpc drpc "words" "hello")))
@@ -94,7 +98,7 @@
           (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
           )))))
 
-;; this test reproduces a bug where committer spouts freeze processing when 
+;; this test reproduces a bug where committer spouts freeze processing when
 ;; there's at least one repartitioning after the spout
 (deftest test-word-count-committer-spout
   (t/with-local-cluster [cluster]
@@ -119,7 +123,7 @@
             (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
             (.aggregate (fields "count") (Sum.) (fields "sum"))
             (.project (fields "sum")))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (feed feeder [["hello the man said"] ["the"]])
           (is (= [[2]] (exec-drpc drpc "words" "the")))
           (is (= [[1]] (exec-drpc drpc "words" "hello")))
@@ -146,13 +150,13 @@
             (.aggregate (CountAsAggregator.) (fields "count"))
             (.parallelismHint 2) ;;this makes sure batchGlobal is working correctly
             (.project (fields "count")))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (doseq [i (range 100)]
             (is (= [[1]] (exec-drpc drpc "numwords" "the"))))
           (is (= [[0]] (exec-drpc drpc "numwords" "")))
           (is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8")))
           )))))
-          
+
 (deftest test-split-merge
   (t/with-local-cluster [cluster]
     (with-drpc [drpc]
@@ -169,7 +173,7 @@
               (.project (fields "len"))))
 
         (.merge topo [s1 s2])
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
           (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
           )))))
@@ -191,11 +195,11 @@
               (.aggregate (CountAsAggregator.) (fields "count"))))
 
         (.merge topo [s1 s2])
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
           (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
           )))))
-          
+
 (deftest test-multi-repartition
   (t/with-local-cluster [cluster]
     (with-drpc [drpc]
@@ -207,7 +211,7 @@
                                    (.shuffle)
                                    (.aggregate (CountAsAggregator.) (fields "count"))
                                    ))
-        (with-topology [cluster topo]
+        (with-topology [cluster topo storm-topo]
           (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man")))
           (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
           )))))
@@ -281,6 +285,86 @@
                         (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count"))))))
      )))
 
+
+(deftest test-set-component-resources
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind feeder (feeder-spout ["sentence"]))
+        (bind add-bang (proxy [BaseFunction] []
+                         (execute [tuple collector]
+                           (. collector emit (str (. tuple getString 0) "!")))))
+        (bind word-counts
+          (.. topo
+              (newStream "words" feeder)
+              (parallelismHint 5)
+              (setCPULoad 20)
+              (setMemoryLoad 512 256)
+              (each (fields "sentence") (Split.) (fields "word"))
+              (setCPULoad 10)
+              (setMemoryLoad 512)
+              (each (fields "word") add-bang (fields "word!"))
+              (parallelismHint 10)
+              (setCPULoad 50)
+              (setMemoryLoad 1024)
+              (groupBy (fields "word!"))
+              (persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (setCPULoad 100)
+              (setMemoryLoad 2048)))
+        (with-topology [cluster topo storm-topo]
+;          (log-message "\n")
+;          (log-message "Getting json confs from bolts:")
+;;          (log-message "Bolts: " (. storm-topo get_bolts) "(" (. storm-topo get_bolts_size) ")")
+;          (doall (map (fn [[k v]] (log-message k ":" (.. v get_common get_json_conf))) (. storm-topo get_bolts)))
+
+          (let [parse-fn (fn [[k v]]
+                           [k (clojurify-structure (. (JSONParser.) parse (.. v get_common get_json_conf)))])
+                json-confs (into {} (map parse-fn (. storm-topo get_bolts)))]
+            (testing "spout memory"
+              (is (= (-> (json-confs "spout-words")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+                     512.0))
+
+              (is (= (-> (json-confs "spout-words")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB))
+                   256.0))
+
+              (is (= (-> (json-confs "$spoutcoord-spout-words")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+                     512.0))
+
+              (is (= (-> (json-confs "$spoutcoord-spout-words")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB))
+                     256.0)))
+
+            (testing "spout CPU"
+              (is (= (-> (json-confs "spout-words")
+                         (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+                     20.0))
+
+              (is (= (-> (json-confs "$spoutcoord-spout-words")
+                       (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+                     20.0)))
+
+            (testing "bolt combinations"
+              (is (= (-> (json-confs "b-1")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+                     1536.0))
+
+              (is (= (-> (json-confs "b-1")
+                         (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+                     60.0)))
+
+            (testing "aggregations after partition"
+              (is (= (-> (json-confs "b-0")
+                         (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
+                     2048.0))
+
+              (is (= (-> (json-confs "b-0")
+                         (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))
+                     100.0)))))))))
+
 ;; (deftest test-split-merge
 ;;   (t/with-local-cluster [cluster]
 ;;     (with-drpc [drpc]
@@ -295,7 +379,7 @@
 ;;           (-> drpc-stream
 ;;               (.each (fields "args") (StringLength.) (fields "len"))
 ;;               (.project (fields "len"))))
-;; 
+;;
 ;;         (.merge topo [s1 s2])
 ;;         (with-topology [cluster topo]
 ;;           (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))


[3/8] storm git commit: Addressing comments.

Posted by kn...@apache.org.
Addressing comments.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d3323f30
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d3323f30
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d3323f30

Branch: refs/heads/master
Commit: d3323f305c4835274a219bbe354597856ce6a1ff
Parents: fbfb1ca
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Fri Mar 11 11:22:06 2016 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Fri Mar 11 11:22:06 2016 -0600

----------------------------------------------------------------------
 storm-core/src/jvm/org/apache/storm/trident/Stream.java       | 6 +++---
 .../src/jvm/org/apache/storm/trident/TridentTopology.java     | 7 +------
 .../integration/org/apache/storm/trident/integration_test.clj | 6 +-----
 3 files changed, 5 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d3323f30/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index e13cb49..b680977 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -124,7 +124,7 @@ public class Stream implements IAggregatableStream {
     }
 
     /**
-     * Sets the CPU Load resource for the current node
+     * Sets the CPU Load resource for the current operation
      */
     public Stream setCPULoad(Number load) {
         _node.setCPULoad(load);
@@ -132,7 +132,7 @@ public class Stream implements IAggregatableStream {
     }
 
     /**
-     * Sets the Memory Load resources for the current node.
+     * Sets the Memory Load resources for the current operation.
      * offHeap becomes default
      */
     public Stream setMemoryLoad(Number onHeap) {
@@ -141,7 +141,7 @@ public class Stream implements IAggregatableStream {
     }
 
     /**
-     * Sets the Memory Load resources for the current node
+     * Sets the Memory Load resources for the current operation.
      */
     public Stream setMemoryLoad(Number onHeap, Number offHeap) {
         _node.setMemoryLoad(onHeap, offHeap);

http://git-wip-us.apache.org/repos/asf/storm/blob/d3323f30/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index 3836663..ccf01dd 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -401,12 +401,7 @@ public class TridentTopology {
             Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
 
             Map<String, Number> spoutRes = null;
-            if(sn instanceof ITridentResource) {
-                spoutRes = mergeDefaultResources(((ITridentResource)sn).getResources(), defaults);
-            }
-            else {
-                spoutRes = mergeDefaultResources(null, defaults);
-            }
+            spoutRes = mergeDefaultResources(sn.getResources(), defaults);
             Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
             Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
             Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);

http://git-wip-us.apache.org/repos/asf/storm/blob/d3323f30/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
index 14e6c5b..c205571 100644
--- a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
@@ -313,10 +313,6 @@
               (setCPULoad 100)
               (setMemoryLoad 2048)))
         (with-topology [cluster topo storm-topo]
-;          (log-message "\n")
-;          (log-message "Getting json confs from bolts:")
-;;          (log-message "Bolts: " (. storm-topo get_bolts) "(" (. storm-topo get_bolts_size) ")")
-;          (doall (map (fn [[k v]] (log-message k ":" (.. v get_common get_json_conf))) (. storm-topo get_bolts)))
 
           (let [parse-fn (fn [[k v]]
                            [k (clojurify-structure (. (JSONParser.) parse (.. v get_common get_json_conf)))])
@@ -350,7 +346,7 @@
             (testing "bolt combinations"
               (is (= (-> (json-confs "b-1")
                          (get TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB))
-                     1536.0))
+                     (+ 1024.0 512.0)))
 
               (is (= (-> (json-confs "b-1")
                          (get TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT))


[5/8] storm git commit: adding code documentation explaining math of combining component resources.

Posted by kn...@apache.org.
adding code documentation explaining math of combining component resources.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5404023a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5404023a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5404023a

Branch: refs/heads/master
Commit: 5404023ae5372ac84d7f77a2bcc71015cd50d240
Parents: 178dd54
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Wed Mar 16 15:52:38 2016 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Wed Mar 16 15:52:38 2016 -0500

----------------------------------------------------------------------
 .../jvm/org/apache/storm/trident/TridentTopology.java   | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5404023a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index 6a4e92f..3aefdc5 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -473,6 +473,18 @@ public class TridentTopology {
         Number offHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
         Number cpuLoad = res.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
 
+        /* We take the max of the default and whatever the user put in here.
+           Each node's resources can be the sum of several operations, so the simplest
+           thing to do is get the max.
+
+           The situation we want to avoid is that the user sets low resources on one
+           node, and when that node is combined with a bunch of others, the sum is still
+           that low resource count. If any component isn't set, we want to use the default.
+
+           Right now, this code does not check that. It just takes the max of the summed
+           up resource counts for simplicity's sake. We could perform some more complicated
+           logic to be more accurate, but the benefits are very small, and only apply to some
+           very odd corner cases. */g
         if(onHeap == null) {
             onHeap = onHeapDefault;
         }


[4/8] storm git commit: Addressing Comments.

Posted by kn...@apache.org.
Addressing Comments.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/178dd546
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/178dd546
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/178dd546

Branch: refs/heads/master
Commit: 178dd5464790cb8a01c09b43f04ce7dfbc61a332
Parents: d3323f3
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Wed Mar 16 15:33:03 2016 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Wed Mar 16 15:33:03 2016 -0500

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/trident/Stream.java    |  6 +++++-
 .../jvm/org/apache/storm/trident/TridentState.java  |  6 +++++-
 .../org/apache/storm/trident/TridentTopology.java   |  4 ++--
 .../trident/operation/DefaultResourceDeclarer.java  | 16 ++++++++++------
 .../jvm/org/apache/storm/trident/planner/Node.java  |  2 +-
 5 files changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index b680977..4a51b56 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -20,6 +20,7 @@ package org.apache.storm.trident;
 import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.NullStruct;
 import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.topology.ResourceDeclarer;
 import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
 import org.apache.storm.trident.fluent.GlobalAggregationScheme;
 import org.apache.storm.trident.fluent.GroupedStream;
@@ -90,7 +91,7 @@ import java.util.Comparator;
  *
  */
 // TODO: need to be able to replace existing fields with the function fields (like Cascading Fields.REPLACE)
-public class Stream implements IAggregatableStream {
+public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     Node _node;
     TridentTopology _topology;
     String _name;
@@ -126,6 +127,7 @@ public class Stream implements IAggregatableStream {
     /**
      * Sets the CPU Load resource for the current operation
      */
+    @Override
     public Stream setCPULoad(Number load) {
         _node.setCPULoad(load);
         return this;
@@ -135,6 +137,7 @@ public class Stream implements IAggregatableStream {
      * Sets the Memory Load resources for the current operation.
      * offHeap becomes default
      */
+    @Override
     public Stream setMemoryLoad(Number onHeap) {
         _node.setMemoryLoad(onHeap);
         return this;
@@ -143,6 +146,7 @@ public class Stream implements IAggregatableStream {
     /**
      * Sets the Memory Load resources for the current operation.
      */
+    @Override
     public Stream setMemoryLoad(Number onHeap, Number offHeap) {
         _node.setMemoryLoad(onHeap, offHeap);
         return this;

http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
index fafd5f9..18b60e0 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentState.java
@@ -17,10 +17,11 @@
  */
 package org.apache.storm.trident;
 
+import org.apache.storm.topology.ResourceDeclarer;
 import org.apache.storm.trident.planner.Node;
 
 
-public class TridentState {
+public class TridentState implements ResourceDeclarer<TridentState> {
     TridentTopology _topology;
     Node _node;
 
@@ -38,16 +39,19 @@ public class TridentState {
         return this;
     }
 
+    @Override
     public TridentState setCPULoad(Number load) {
         _node.setCPULoad(load);
         return this;
     }
 
+    @Override
     public TridentState setMemoryLoad(Number onHeap) {
         _node.setMemoryLoad(onHeap);
         return this;
     }
 
+    @Override
     public TridentState setMemoryLoad(Number onHeap, Number offHeap) {
         _node.setMemoryLoad(onHeap, offHeap);
         return this;

http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index ccf01dd..6a4e92f 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -458,7 +458,7 @@ public class TridentTopology {
     private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map defaultConfig) {
         Map<String, Number> ret = new HashMap<String, Number>();
 
-        Number  onHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        Number onHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
         Number offHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
         Number cpuLoadDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
 
@@ -469,7 +469,7 @@ public class TridentTopology {
             return ret;
         }
 
-        Number  onHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        Number onHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
         Number offHeap = res.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
         Number cpuLoad = res.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
index 72ca27e..d49011a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
@@ -23,18 +23,22 @@ import org.apache.storm.Config;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.topology.ResourceDeclarer;
 
-public class DefaultResourceDeclarer implements ResourceDeclarer, ITridentResource {
+/**
+ * @param T Must always be the type of the extending class. i.e.
+ * public class SubResourceDeclarer extends DefaultResourceDeclarer<SubResourceDeclarer> {...}
+ */
+public class DefaultResourceDeclarer<T extends DefaultResourceDeclarer> implements ResourceDeclarer<T>, ITridentResource {
 
     private Map<String, Number> resources = new HashMap<>();
     private Map conf = Utils.readStormConfig();
 
     @Override
-    public DefaultResourceDeclarer setMemoryLoad(Number onHeap) {
+    public T setMemoryLoad(Number onHeap) {
         return setMemoryLoad(onHeap, Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)));
     }
 
     @Override
-    public DefaultResourceDeclarer setMemoryLoad(Number onHeap, Number offHeap) {
+    public T setMemoryLoad(Number onHeap, Number offHeap) {
         if (onHeap != null) {
             onHeap = onHeap.doubleValue();
             resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
@@ -43,16 +47,16 @@ public class DefaultResourceDeclarer implements ResourceDeclarer, ITridentResour
             offHeap = offHeap.doubleValue();
             resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
         }
-        return this;
+        return (T)this;
     }
 
     @Override
-    public DefaultResourceDeclarer setCPULoad(Number amount) {
+    public T setCPULoad(Number amount) {
         if(amount != null) {
             amount = amount.doubleValue();
             resources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount);
         }
-        return this;
+        return (T)this;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/178dd546/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
index e39ec50..b2466e6 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 
 
-public class Node extends DefaultResourceDeclarer implements Serializable {
+public class Node extends DefaultResourceDeclarer<Node> implements Serializable {
     private static final AtomicInteger INDEX = new AtomicInteger(0);
     
     private String nodeId;


[8/8] storm git commit: Adding STORM-1616 to CHANGELOG.md

Posted by kn...@apache.org.
Adding STORM-1616 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/367464a3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/367464a3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/367464a3

Branch: refs/heads/master
Commit: 367464a3d9aa92fca9d64f4e50e780b775279a3a
Parents: fb880ca
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Thu Mar 17 15:04:13 2016 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Mar 17 15:04:13 2016 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/367464a3/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0ea7a8d..61ad4df 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-1616: Add RAS API for Trident
  * STORM-1623: nimbus.clj's minor bug
  * STORM-1624: Add maven central status in README
  * STORM-1232: port backtype.storm.scheduler.DefaultScheduler to java