You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/10/19 22:07:12 UTC

[15/18] storm git commit: Calling Slot Resource Allocation from RAS

Calling Slot Resource Allocation from RAS


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e50bed80
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e50bed80
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e50bed80

Branch: refs/heads/master
Commit: e50bed80bea67ceef43cdca354c24c5eee4dbef4
Parents: f9e095d
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Oct 9 13:35:54 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Mon Oct 19 09:44:12 2015 -0500

----------------------------------------------------------------------
 .../clj/backtype/storm/daemon/supervisor.clj    |  2 +-
 storm-core/src/jvm/backtype/storm/Config.java   |  2 +-
 .../storm/scheduler/resource/RAS_Node.java      | 28 ++++++++++++++++++++
 3 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e50bed80/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 8a844f2..0461cd7 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -66,7 +66,7 @@
 
 (defn- read-my-executors [assignments-snapshot storm-id assignment-id]
   (let [assignment (get assignments-snapshot storm-id)
-        my-slots-resources (into {} 
+        my-slots-resources (into {}
                                  (filter (fn [[[node _] _]] (= node assignment-id))
                                          (:worker->resources assignment)))
         my-executors (filter (fn [[_ [node _]]] (= node assignment-id))

http://git-wip-us.apache.org/repos/asf/storm/blob/e50bed80/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 59dcc53..a1c732d 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1032,7 +1032,7 @@ public class Config extends HashMap<String, Object> {
     public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity";
 
     /**
-     * The jvm opts provided to workers launched by this supervisor. 
+     * The jvm opts provided to workers launched by this supervisor.
      * All "%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%",
      * "%WORKER-PORT%" and "%HEAP-MEM%" substrings are replaced with:
      * %ID%          -> port (for backward compatibility),

http://git-wip-us.apache.org/repos/asf/storm/blob/e50bed80/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
index 3c86528..1f2e795 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
@@ -230,6 +230,33 @@ public class RAS_Node {
         _topIdToUsedSlots.remove(topId);
     }
 
+    /**
+     * Allocate Mem and CPU resources to the assigned slot for the topology's executors.
+     * @param td the TopologyDetails that the slot is assigned to.
+     * @param executors the executors to run in that slot.
+     * @param slot the slot to allocate resource to
+     */
+    public void allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
+        double onHeapMem = 0.0;
+        double offHeapMem = 0.0;
+        double cpu = 0.0;
+        for (ExecutorDetails exec : executors) {
+            Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
+            if (onHeapMemForExec != null) {
+                onHeapMem += onHeapMemForExec;
+            }
+            Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
+            if (offHeapMemForExec != null) {
+                offHeapMem += offHeapMemForExec;
+            }
+            Double cpuForExec = td.getTotalCpuReqTask(exec);
+            if (cpuForExec != null) {
+                cpu += cpuForExec;
+            }
+        }
+        slot.allocateResource(onHeapMem, offHeapMem, cpu);
+    }
+
     public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors,
                        Cluster cluster) {
         if (!_isAlive) {
@@ -248,6 +275,7 @@ public class RAS_Node {
         if (!_freeSlots.contains(target)) {
             throw new IllegalStateException("Trying to assign already used slot" + target.getPort() + "on node " + _nodeId);
         } else {
+            allocateResourceToSlot(td, executors, target);
             cluster.assign(target, td.getId(), executors);
             assignInternal(target, td.getId(), false);
         }