You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/11/20 23:26:29 UTC
[04/15] storm git commit: Calculate memory usage of each topology and
supervisor for non-RAS schedulers
Calculate memory usage of each topology and supervisor for non-RAS schedulers
Minor change before thrift-9.3
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce7085ba
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce7085ba
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce7085ba
Branch: refs/heads/master
Commit: ce7085bafe0f75a3183977414f2321fdd1b55c94
Parents: 2d993c5
Author: zhuol <zh...@yahoo-inc.com>
Authored: Mon Nov 16 14:11:55 2015 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Nov 19 12:39:25 2015 -0600
----------------------------------------------------------------------
conf/defaults.yaml | 2 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 4 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 3 +-
storm-core/src/jvm/backtype/storm/Config.java | 2 +-
.../storm/generated/SupervisorSummary.java | 4 +-
.../jvm/backtype/storm/scheduler/Cluster.java | 75 +++++++++++++++++++-
.../resource/ResourceAwareScheduler.java | 4 +-
.../src/jvm/backtype/storm/utils/Utils.java | 38 ++++++++++
storm-core/src/py/storm/ttypes.py | 4 +-
storm-core/src/ui/public/index.html | 2 -
.../templates/topology-page-template.html | 6 ++
storm-core/src/ui/public/topology.html | 2 +-
12 files changed, 132 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index af5daa1..b520745 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -74,7 +74,7 @@ topology.max.replication.wait.time.sec: 60
nimbus.credential.renewers.freq.secs: 600
nimbus.impersonation.authorizer: "backtype.storm.security.auth.authorizer.ImpersonationAuthorizer"
-scheduler.resource.display: false
+scheduler.display.resource: false
### ui.* configs are for the master
ui.host: 0.0.0.0
http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 87b5a8b..176ce6b 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -745,9 +745,11 @@
;; call scheduler.schedule to schedule all the topologies
;; the new assignments for all the topologies are in the cluster object.
_ (.schedule (:scheduler nimbus) topologies cluster)
+ _ (.setResourcesMap cluster @(:id->resources nimbus))
+ _ (if-not (conf SCHEDULER-RESOURCE-DISPLAY) (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies))
_ (reset! (:id->sched-status nimbus) (.getStatusMap cluster))
_ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster))
- _ (reset! (:id->resources nimbus) (merge @(:id->resources nimbus) (.getResourcesMap cluster)))]
+ _ (reset! (:id->resources nimbus) (.getResourcesMap cluster))]
(.getAssignments cluster)))
(defn changed-executors [executor->node+port new-executor->node+port]
http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 64dc1b9..480dcd3 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -464,7 +464,7 @@
"schedulerInfo" (.get_sched_status t)
"requestedMemOnHeap" (.get_requested_memonheap t)
"requestedMemOffHeap" (.get_requested_memoffheap t)
- "requestedMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t))
+ "requestedTotalMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t))
"requestedCpu" (.get_requested_cpu t)
"assignedMemOnHeap" (.get_assigned_memonheap t)
"assignedMemOffHeap" (.get_assigned_memoffheap t)
@@ -592,6 +592,7 @@
"requestedCpu" (.get_requested_cpu topo-info)
"assignedMemOnHeap" (.get_assigned_memonheap topo-info)
"assignedMemOffHeap" (.get_assigned_memoffheap topo-info)
+ "assignedTotalMem" (+ (.get_assigned_memonheap topo-info) (.get_assigned_memoffheap topo-info))
"assignedCpu" (.get_assigned_cpu topo-info)
"topologyStats" topo-stats
"spouts" (map (partial comp-agg-stats-json id secure?)
http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 852757d..afa200b 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -180,7 +180,7 @@ public class Config extends HashMap<String, Object> {
* If this is not set, we will not display resource capacity and usage on the UI.
*/
@isBoolean
- public static final String SCHEDULER_RESOURCE_DISPLAY = "scheduler.resource.display";
+ public static final String SCHEDULER_RESOURCE_DISPLAY = "scheduler.display.resource";
/**
* The mode this Storm cluster is running in. Either "distributed" or "local".
http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java b/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
index 88202d8..edb2016 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
@@ -583,10 +583,10 @@ public class SupervisorSummary implements org.apache.thrift.TBase<SupervisorSumm
return get_total_resources();
case USED_MEM:
- return Double.valueOf(get_used_mem());
+ return get_used_mem();
case USED_CPU:
- return Double.valueOf(get_used_cpu());
+ return get_used_cpu();
}
throw new IllegalStateException();
http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index c4a815f..ff08ad1 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -466,6 +466,75 @@ public class Cluster {
return networkTopography;
}
+ /*
+ * Get heap memory usage for a worker's main process and logwriter process
+ * */
+ private Double getAssignedMemoryForSlot(Map topConf) {
+ Double totalWorkerMemory = 0.0;
+
+ String topology_worker_childopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), null);
+ String worker_childopts = Utils.getString(topConf.get(Config.WORKER_CHILDOPTS), null);
+ Double mem_topology_worker_childopts = Utils.parseWorkerChildOpts(topology_worker_childopts, null);
+ Double mem_worker_childopts = Utils.parseWorkerChildOpts(worker_childopts, null);
+
+ if (mem_topology_worker_childopts != null) {
+ totalWorkerMemory += mem_topology_worker_childopts;
+ } else if (mem_worker_childopts != null) {
+ totalWorkerMemory += mem_worker_childopts;
+ } else {
+ totalWorkerMemory += Utils.getInt(topConf.get(Config.WORKER_HEAP_MEMORY_MB));
+ }
+
+ String topology_worker_lw_childiopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS), null);
+ if (topology_worker_lw_childiopts != null) {
+ totalWorkerMemory += Utils.parseWorkerChildOpts(topology_worker_lw_childiopts, 0.0);
+ }
+ return totalWorkerMemory;
+ }
+
+ /*
+ * Update memory usage for each topology and each supervisor node after every round of scheduling
+ * */
+ public void updateAssignedMemoryForTopologyAndSupervisor(Topologies topologies) {
+ Map<String, Double> supervisorToAssignedMem = new HashMap<String, Double>();
+
+ for (Map.Entry<String, SchedulerAssignment> entry : this.getAssignments().entrySet()) {
+ String topId = entry.getValue().getTopologyId();
+ Map topConf = topologies.getById(topId).getConf();
+ Double assignedMemForTopology = 0.0;
+ Double assignedMemPerSlot = getAssignedMemoryForSlot(topConf);
+ for (WorkerSlot ws: entry.getValue().getSlots()) {
+ assignedMemForTopology += assignedMemPerSlot;
+ String nodeId = ws.getNodeId();
+ if (supervisorToAssignedMem.containsKey(nodeId)) {
+ supervisorToAssignedMem.put(nodeId, supervisorToAssignedMem.get(nodeId) + assignedMemPerSlot);
+ } else {
+ supervisorToAssignedMem.put(nodeId, assignedMemPerSlot);
+ }
+ }
+ if (this.getResourcesMap().containsKey(topId)) {
+ Double[] topo_resources = getResourcesMap().get(topId);
+ topo_resources[3] = assignedMemForTopology;
+ } else {
+ Double[] topo_resources = {0.0, 0.0, 0.0, 0.0, 0.0, 0.0};
+ topo_resources[3] = assignedMemForTopology;
+ this.setResources(topId, topo_resources);
+ }
+ }
+
+ for (Map.Entry<String, Double> entry : supervisorToAssignedMem.entrySet()) {
+ String nodeId = entry.getKey();
+ if (this.supervisorsResources.containsKey(nodeId)) {
+ Double[] supervisor_resources = supervisorsResources.get(nodeId);
+ supervisor_resources[2] = entry.getValue();
+ } else {
+ Double[] supervisor_resources = {0.0, 0.0, 0.0, 0.0};
+ supervisor_resources[2] = entry.getValue();
+ this.supervisorsResources.put(nodeId, supervisor_resources);
+ }
+ }
+ }
+
public void setStatus(String topologyId, String status) {
this.status.put(topologyId, status);
}
@@ -478,11 +547,15 @@ public class Cluster {
this.resources.put(topologyId, resources);
}
+ public void setResourcesMap(Map<String, Double[]> topologies_resources) {
+ this.resources.putAll(topologies_resources);
+ }
+
public Map<String, Double[]> getResourcesMap() {
return this.resources;
}
- public void setSupervisorsResources(Map<String, Double[]> supervisors_resources) {
+ public void setSupervisorsResourcesMap(Map<String, Double[]> supervisors_resources) {
this.supervisorsResources.putAll(supervisors_resources);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index ad33b43..ac34d44 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -102,7 +102,7 @@ public class ResourceAwareScheduler implements IScheduler {
Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
LOG.debug("setResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " +
- "assigned on-heap mem, off-heap mem, cpu: {} {} {}",
+ "assigned on-heap mem, off-heap mem, cpu: {} {} {}",
td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
cluster.setResources(td.getId(), resources);
@@ -125,7 +125,7 @@ public class ResourceAwareScheduler implements IScheduler {
Double[] resources = {totalMem, totalCpu, usedMem, usedCpu};
supervisors_resources.put(entry.getKey(), resources);
}
- cluster.setSupervisorsResources(supervisors_resources);
+ cluster.setSupervisorsResourcesMap(supervisors_resources);
}
private Map<String, Double> getUserConf() {
http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index 00af367..91fdb09 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -69,6 +69,8 @@ import java.util.HashSet;
import java.util.HashMap;
import java.util.TreeMap;
import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
@@ -503,6 +505,18 @@ public class Utils {
}
}
+ public static String getString(Object o, String defaultValue) {
+ if (null == o) {
+ return defaultValue;
+ }
+
+ if (o instanceof String) {
+ return (String) o;
+ } else {
+ throw new IllegalArgumentException("Don't know how to convert " + o + " + to String");
+ }
+ }
+
public static long secureRandomLong() {
return UUID.randomUUID().getLeastSignificantBits();
}
@@ -763,6 +777,30 @@ public class Utils {
public static double zeroIfNaNOrInf(double x) {
return (Double.isNaN(x) || Double.isInfinite(x)) ? 0.0 : x;
+
+ /**
+ * parses the arguments to extract jvm heap memory size.
+ * @param input
+ * @param defaultValue
+ * @return the value of the JVM heap memory setting in a java command.
+ */
+ public static Double parseWorkerChildOpts(String input, Double defaultValue) {
+ if(input != null) {
+ Pattern optsPattern = Pattern.compile("Xmx[0-9]+m");
+ Matcher m = optsPattern.matcher(input);
+ String memoryOpts = null;
+ while (m.find()) {
+ memoryOpts = m.group();
+ }
+ if(memoryOpts!=null) {
+ memoryOpts = memoryOpts.replaceAll("[a-zA-Z]", "");
+ return Double.parseDouble(memoryOpts);
+ } else {
+ return defaultValue;
+ }
+ } else {
+ return defaultValue;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index ea3c768..4694ba2 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -2590,12 +2590,12 @@ class SupervisorSummary:
iprot.skip(ftype)
elif fid == 8:
if ftype == TType.DOUBLE:
- self.used_mem = iprot.readDouble();
+ self.used_mem = iprot.readDouble()
else:
iprot.skip(ftype)
elif fid == 9:
if ftype == TType.DOUBLE:
- self.used_cpu = iprot.readDouble();
+ self.used_cpu = iprot.readDouble()
else:
iprot.skip(ftype)
else:
http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/ui/public/index.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/index.html b/storm-core/src/ui/public/index.html
index d113e30..454fe64 100644
--- a/storm-core/src/ui/public/index.html
+++ b/storm-core/src/ui/public/index.html
@@ -146,7 +146,6 @@ $(document).ready(function() {
$.getJSON("/api/v1/cluster/configuration", function(json){
var displayResource = json["scheduler.display.resource"];
if (!displayResource){
- $('#topology-summary td:nth-child(9),#topology-summary th:nth-child(9)').hide();
$('#topology-summary td:nth-child(10),#topology-summary th:nth-child(10)').hide();
}
});
@@ -167,7 +166,6 @@ $(document).ready(function() {
var displayResource = json["scheduler.display.resource"];
if (!displayResource){
$('#supervisor-summary td:nth-child(6),#supervisor-summary th:nth-child(6)').hide();
- $('#supervisor-summary td:nth-child(7),#supervisor-summary th:nth-child(7)').hide();
$('#supervisor-summary td:nth-child(8),#supervisor-summary th:nth-child(8)').hide();
$('#supervisor-summary td:nth-child(9),#supervisor-summary th:nth-child(9)').hide();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/ui/public/templates/topology-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/topology-page-template.html b/storm-core/src/ui/public/templates/topology-page-template.html
index 0297839..1f81f1b 100644
--- a/storm-core/src/ui/public/templates/topology-page-template.html
+++ b/storm-core/src/ui/public/templates/topology-page-template.html
@@ -64,6 +64,11 @@
</span>
</th>
<th>
+ <span data-toggle="tooltip" data-placement="above" title="Assigned Total Memory by Scheduler.">
+ Assigned Mem (MB)
+ </span>
+ </th>
+ <th>
<span data-toggle="tooltip" data-placement="left" title="This shows information from the scheduler about the latest attempt to schedule the Topology on the cluster.">
Scheduler Info
</span>
@@ -81,6 +86,7 @@
<td>{{executorsTotal}}</td>
<td>{{tasksTotal}}</td>
<td>{{replicationCount}}</td>
+ <td>{{assignedTotalMem}}</td>
<td>{{schedulerInfo}}</td>
</tr>
</tbody>
http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index b062196..c5b8684 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -294,7 +294,7 @@ $(document).ready(function() {
topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response));
topologyResources.append(Mustache.render($(template).filter("#topology-resources-template").html(),response));
$.getJSON("/api/v1/cluster/configuration", function(json){
- var displayResource = json["scheduler.resource.display"];
+ var displayResource = json["scheduler.display.resource"];
if (!displayResource){
$('#topology-resources-header').hide();
$('#topology-resources').hide();