You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/10/19 22:07:12 UTC
[15/18] storm git commit: Calling Slot Resource Allocation from RAS
Calling Slot Resource Allocation from RAS
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e50bed80
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e50bed80
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e50bed80
Branch: refs/heads/master
Commit: e50bed80bea67ceef43cdca354c24c5eee4dbef4
Parents: f9e095d
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Oct 9 13:35:54 2015 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Mon Oct 19 09:44:12 2015 -0500
----------------------------------------------------------------------
.../clj/backtype/storm/daemon/supervisor.clj | 2 +-
storm-core/src/jvm/backtype/storm/Config.java | 2 +-
.../storm/scheduler/resource/RAS_Node.java | 28 ++++++++++++++++++++
3 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e50bed80/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 8a844f2..0461cd7 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -66,7 +66,7 @@
(defn- read-my-executors [assignments-snapshot storm-id assignment-id]
(let [assignment (get assignments-snapshot storm-id)
- my-slots-resources (into {}
+ my-slots-resources (into {}
(filter (fn [[[node _] _]] (= node assignment-id))
(:worker->resources assignment)))
my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
http://git-wip-us.apache.org/repos/asf/storm/blob/e50bed80/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 59dcc53..a1c732d 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1032,7 +1032,7 @@ public class Config extends HashMap<String, Object> {
public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity";
/**
- * The jvm opts provided to workers launched by this supervisor.
+ * The jvm opts provided to workers launched by this supervisor.
* All "%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%",
* "%WORKER-PORT%" and "%HEAP-MEM%" substrings are replaced with:
* %ID% -> port (for backward compatibility),
http://git-wip-us.apache.org/repos/asf/storm/blob/e50bed80/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
index 3c86528..1f2e795 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
@@ -230,6 +230,33 @@ public class RAS_Node {
_topIdToUsedSlots.remove(topId);
}
+ /**
+ * Allocate Mem and CPU resources to the assigned slot for the topology's executors.
+ * @param td the TopologyDetails that the slot is assigned to.
+ * @param executors the executors to run in that slot.
+ * @param slot the slot to allocate resource to
+ */
+ public void allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
+ double onHeapMem = 0.0;
+ double offHeapMem = 0.0;
+ double cpu = 0.0;
+ for (ExecutorDetails exec : executors) {
+ Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
+ if (onHeapMemForExec != null) {
+ onHeapMem += onHeapMemForExec;
+ }
+ Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
+ if (offHeapMemForExec != null) {
+ offHeapMem += offHeapMemForExec;
+ }
+ Double cpuForExec = td.getTotalCpuReqTask(exec);
+ if (cpuForExec != null) {
+ cpu += cpuForExec;
+ }
+ }
+ slot.allocateResource(onHeapMem, offHeapMem, cpu);
+ }
+
public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors,
Cluster cluster) {
if (!_isAlive) {
@@ -248,6 +275,7 @@ public class RAS_Node {
if (!_freeSlots.contains(target)) {
throw new IllegalStateException("Trying to assign already used slot" + target.getPort() + "on node " + _nodeId);
} else {
+ allocateResourceToSlot(td, executors, target);
cluster.assign(target, td.getId(), executors);
assignInternal(target, td.getId(), false);
}