You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/11/20 23:26:29 UTC

[04/15] storm git commit: Calculate memory usage of each topology and supervisor for non-RAS schedulers

Calculate memory usage of each topology and supervisor for non-RAS schedulers

Minor change before thrift-9.3


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce7085ba
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce7085ba
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce7085ba

Branch: refs/heads/master
Commit: ce7085bafe0f75a3183977414f2321fdd1b55c94
Parents: 2d993c5
Author: zhuol <zh...@yahoo-inc.com>
Authored: Mon Nov 16 14:11:55 2015 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Nov 19 12:39:25 2015 -0600

----------------------------------------------------------------------
 conf/defaults.yaml                              |  2 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  4 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  3 +-
 storm-core/src/jvm/backtype/storm/Config.java   |  2 +-
 .../storm/generated/SupervisorSummary.java      |  4 +-
 .../jvm/backtype/storm/scheduler/Cluster.java   | 75 +++++++++++++++++++-
 .../resource/ResourceAwareScheduler.java        |  4 +-
 .../src/jvm/backtype/storm/utils/Utils.java     | 38 ++++++++++
 storm-core/src/py/storm/ttypes.py               |  4 +-
 storm-core/src/ui/public/index.html             |  2 -
 .../templates/topology-page-template.html       |  6 ++
 storm-core/src/ui/public/topology.html          |  2 +-
 12 files changed, 132 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index af5daa1..b520745 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -74,7 +74,7 @@ topology.max.replication.wait.time.sec: 60
 nimbus.credential.renewers.freq.secs: 600
 nimbus.impersonation.authorizer: "backtype.storm.security.auth.authorizer.ImpersonationAuthorizer"
 
-scheduler.resource.display: false
+scheduler.display.resource: false
 
 ### ui.* configs are for the master
 ui.host: 0.0.0.0

http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 87b5a8b..176ce6b 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -745,9 +745,11 @@
         ;; call scheduler.schedule to schedule all the topologies
         ;; the new assignments for all the topologies are in the cluster object.
         _ (.schedule (:scheduler nimbus) topologies cluster)
+        _ (.setResourcesMap cluster @(:id->resources nimbus))
+        _ (if-not (conf SCHEDULER-RESOURCE-DISPLAY) (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies))
         _ (reset! (:id->sched-status nimbus) (.getStatusMap cluster))
         _ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster))
-        _ (reset! (:id->resources nimbus) (merge @(:id->resources nimbus) (.getResourcesMap cluster)))]
+        _ (reset! (:id->resources nimbus) (.getResourcesMap cluster))]
     (.getAssignments cluster)))
 
 (defn changed-executors [executor->node+port new-executor->node+port]

http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 64dc1b9..480dcd3 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -464,7 +464,7 @@
        "schedulerInfo" (.get_sched_status t)
        "requestedMemOnHeap" (.get_requested_memonheap t)
        "requestedMemOffHeap" (.get_requested_memoffheap t)
-       "requestedMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t))
+       "requestedTotalMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t))
        "requestedCpu" (.get_requested_cpu t)
        "assignedMemOnHeap" (.get_assigned_memonheap t)
        "assignedMemOffHeap" (.get_assigned_memoffheap t)
@@ -592,6 +592,7 @@
      "requestedCpu" (.get_requested_cpu topo-info)
      "assignedMemOnHeap" (.get_assigned_memonheap topo-info)
      "assignedMemOffHeap" (.get_assigned_memoffheap topo-info)
+     "assignedTotalMem" (+ (.get_assigned_memonheap topo-info) (.get_assigned_memoffheap topo-info))
      "assignedCpu" (.get_assigned_cpu topo-info)
      "topologyStats" topo-stats
      "spouts" (map (partial comp-agg-stats-json id secure?)

http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 852757d..afa200b 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -180,7 +180,7 @@ public class Config extends HashMap<String, Object> {
      * If this is not set, we will not display resource capacity and usage on the UI.
      */
     @isBoolean
-    public static final String SCHEDULER_RESOURCE_DISPLAY = "scheduler.resource.display";
+    public static final String SCHEDULER_RESOURCE_DISPLAY = "scheduler.display.resource";
 
     /**
      * The mode this Storm cluster is running in. Either "distributed" or "local".

http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java b/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
index 88202d8..edb2016 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
@@ -583,10 +583,10 @@ public class SupervisorSummary implements org.apache.thrift.TBase<SupervisorSumm
       return get_total_resources();
 
     case USED_MEM:
-      return Double.valueOf(get_used_mem());
+      return get_used_mem();
 
     case USED_CPU:
-      return Double.valueOf(get_used_cpu());
+      return get_used_cpu();
 
     }
     throw new IllegalStateException();

http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index c4a815f..ff08ad1 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -466,6 +466,75 @@ public class Cluster {
         return networkTopography;
     }
 
+    /*
+    * Get heap memory usage for a worker's main process and logwriter process
+    * */
+    private Double getAssignedMemoryForSlot(Map topConf) {
+        Double totalWorkerMemory = 0.0;
+
+        String topology_worker_childopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), null);
+        String worker_childopts = Utils.getString(topConf.get(Config.WORKER_CHILDOPTS), null);
+        Double mem_topology_worker_childopts = Utils.parseWorkerChildOpts(topology_worker_childopts, null);
+        Double mem_worker_childopts = Utils.parseWorkerChildOpts(worker_childopts, null);
+
+        if (mem_topology_worker_childopts != null) {
+            totalWorkerMemory += mem_topology_worker_childopts;
+        } else if (mem_worker_childopts != null) {
+            totalWorkerMemory += mem_worker_childopts;
+        } else {
+            totalWorkerMemory += Utils.getInt(topConf.get(Config.WORKER_HEAP_MEMORY_MB));
+        }
+
+        String topology_worker_lw_childiopts = Utils.getString(topConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS), null);
+        if (topology_worker_lw_childiopts != null) {
+            totalWorkerMemory += Utils.parseWorkerChildOpts(topology_worker_lw_childiopts, 0.0);
+        }
+        return totalWorkerMemory;
+    }
+
+    /*
+    * Update memory usage for each topology and each supervisor node after every round of scheduling
+    * */
+    public void updateAssignedMemoryForTopologyAndSupervisor(Topologies topologies) {
+        Map<String, Double> supervisorToAssignedMem = new HashMap<String, Double>();
+
+        for (Map.Entry<String, SchedulerAssignment> entry : this.getAssignments().entrySet()) {
+            String topId = entry.getValue().getTopologyId();
+            Map topConf = topologies.getById(topId).getConf();
+            Double assignedMemForTopology = 0.0;
+            Double assignedMemPerSlot = getAssignedMemoryForSlot(topConf);
+            for (WorkerSlot ws: entry.getValue().getSlots()) {
+                assignedMemForTopology += assignedMemPerSlot;
+                String nodeId = ws.getNodeId();
+                if (supervisorToAssignedMem.containsKey(nodeId)) {
+                    supervisorToAssignedMem.put(nodeId, supervisorToAssignedMem.get(nodeId) + assignedMemPerSlot);
+                } else {
+                    supervisorToAssignedMem.put(nodeId, assignedMemPerSlot);
+                }
+            }
+            if (this.getResourcesMap().containsKey(topId)) {
+                Double[] topo_resources = getResourcesMap().get(topId);
+                topo_resources[3] = assignedMemForTopology;
+            } else {
+                Double[] topo_resources = {0.0, 0.0, 0.0, 0.0, 0.0, 0.0};
+                topo_resources[3] = assignedMemForTopology;
+                this.setResources(topId, topo_resources);
+            }
+        }
+
+        for (Map.Entry<String, Double> entry : supervisorToAssignedMem.entrySet()) {
+            String nodeId = entry.getKey();
+            if (this.supervisorsResources.containsKey(nodeId)) {
+                Double[] supervisor_resources = supervisorsResources.get(nodeId);
+                supervisor_resources[2] = entry.getValue();
+            } else {
+                Double[] supervisor_resources = {0.0, 0.0, 0.0, 0.0};
+                supervisor_resources[2] = entry.getValue();
+                this.supervisorsResources.put(nodeId, supervisor_resources);
+            }
+        }
+    }
+
     public void setStatus(String topologyId, String status) {
         this.status.put(topologyId, status);
     }
@@ -478,11 +547,15 @@ public class Cluster {
         this.resources.put(topologyId, resources);
     }
 
+    public void setResourcesMap(Map<String, Double[]> topologies_resources) {
+        this.resources.putAll(topologies_resources);
+    }
+
     public Map<String, Double[]> getResourcesMap() {
         return this.resources;
     }
 
-    public void setSupervisorsResources(Map<String, Double[]> supervisors_resources) {
+    public void setSupervisorsResourcesMap(Map<String, Double[]> supervisors_resources) {
         this.supervisorsResources.putAll(supervisors_resources);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index ad33b43..ac34d44 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -102,7 +102,7 @@ public class ResourceAwareScheduler implements IScheduler {
                 Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
                         assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
                 LOG.debug("setResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " +
-                                "assigned on-heap mem, off-heap mem, cpu: {} {} {}",
+                        "assigned on-heap mem, off-heap mem, cpu: {} {} {}",
                         td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
                         assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
                 cluster.setResources(td.getId(), resources);
@@ -125,7 +125,7 @@ public class ResourceAwareScheduler implements IScheduler {
             Double[] resources = {totalMem, totalCpu, usedMem, usedCpu};
             supervisors_resources.put(entry.getKey(), resources);
         }
-        cluster.setSupervisorsResources(supervisors_resources);
+        cluster.setSupervisorsResourcesMap(supervisors_resources);
     }
 
     private Map<String, Double> getUserConf() {

http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index 00af367..91fdb09 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -69,6 +69,8 @@ import java.util.HashSet;
 import java.util.HashMap;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
@@ -503,6 +505,18 @@ public class Utils {
         }
     }
 
+    public static String getString(Object o, String defaultValue) {
+        if (null == o) {
+            return defaultValue;
+        }
+
+        if (o instanceof String) {
+            return (String) o;
+        } else {
+            throw new IllegalArgumentException("Don't know how to convert " + o + " + to String");
+        }
+    }
+
     public static long secureRandomLong() {
         return UUID.randomUUID().getLeastSignificantBits();
     }
@@ -763,6 +777,30 @@ public class Utils {
 
     public static double zeroIfNaNOrInf(double x) {
         return (Double.isNaN(x) || Double.isInfinite(x)) ? 0.0 : x;
+
+    /**
+     * parses the arguments to extract jvm heap memory size.
+     * @param input
+     * @param defaultValue
+     * @return the value of the JVM heap memory setting in a java command.
+     */
+    public static Double parseWorkerChildOpts(String input, Double defaultValue) {
+        if(input != null) {
+            Pattern optsPattern = Pattern.compile("Xmx[0-9]+m");
+            Matcher m = optsPattern.matcher(input);
+            String memoryOpts = null;
+            while (m.find()) {
+                memoryOpts = m.group();
+            }
+            if(memoryOpts!=null) {
+                memoryOpts = memoryOpts.replaceAll("[a-zA-Z]", "");
+                return Double.parseDouble(memoryOpts);
+            } else {
+                return defaultValue;
+            }
+        } else {
+            return defaultValue;
+        }
     }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index ea3c768..4694ba2 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -2590,12 +2590,12 @@ class SupervisorSummary:
           iprot.skip(ftype)
       elif fid == 8:
         if ftype == TType.DOUBLE:
-          self.used_mem = iprot.readDouble();
+          self.used_mem = iprot.readDouble()
         else:
           iprot.skip(ftype)
       elif fid == 9:
         if ftype == TType.DOUBLE:
-          self.used_cpu = iprot.readDouble();
+          self.used_cpu = iprot.readDouble()
         else:
           iprot.skip(ftype)
       else:

http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/ui/public/index.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/index.html b/storm-core/src/ui/public/index.html
index d113e30..454fe64 100644
--- a/storm-core/src/ui/public/index.html
+++ b/storm-core/src/ui/public/index.html
@@ -146,7 +146,6 @@ $(document).ready(function() {
           $.getJSON("/api/v1/cluster/configuration", function(json){
               var displayResource = json["scheduler.display.resource"];
               if (!displayResource){
-                  $('#topology-summary td:nth-child(9),#topology-summary th:nth-child(9)').hide();
                   $('#topology-summary td:nth-child(10),#topology-summary th:nth-child(10)').hide();
               }
           });
@@ -167,7 +166,6 @@ $(document).ready(function() {
               var displayResource = json["scheduler.display.resource"];
               if (!displayResource){
                   $('#supervisor-summary td:nth-child(6),#supervisor-summary th:nth-child(6)').hide();
-                  $('#supervisor-summary td:nth-child(7),#supervisor-summary th:nth-child(7)').hide();
                   $('#supervisor-summary td:nth-child(8),#supervisor-summary th:nth-child(8)').hide();
                   $('#supervisor-summary td:nth-child(9),#supervisor-summary th:nth-child(9)').hide();
               }

http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/ui/public/templates/topology-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/topology-page-template.html b/storm-core/src/ui/public/templates/topology-page-template.html
index 0297839..1f81f1b 100644
--- a/storm-core/src/ui/public/templates/topology-page-template.html
+++ b/storm-core/src/ui/public/templates/topology-page-template.html
@@ -64,6 +64,11 @@
           </span>
         </th>
         <th>
+          <span data-toggle="tooltip" data-placement="above" title="Assigned Total Memory by Scheduler.">
+            Assigned Mem (MB)
+          </span>
+        </th>
+        <th>
           <span data-toggle="tooltip" data-placement="left" title="This shows information from the scheduler about the latest attempt to schedule the Topology on the cluster.">
             Scheduler Info
           </span>
@@ -81,6 +86,7 @@
         <td>{{executorsTotal}}</td>
         <td>{{tasksTotal}}</td>
         <td>{{replicationCount}}</td>
+        <td>{{assignedTotalMem}}</td>
         <td>{{schedulerInfo}}</td>
       </tr>
     </tbody>

http://git-wip-us.apache.org/repos/asf/storm/blob/ce7085ba/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index b062196..c5b8684 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -294,7 +294,7 @@ $(document).ready(function() {
             topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response));
             topologyResources.append(Mustache.render($(template).filter("#topology-resources-template").html(),response));
             $.getJSON("/api/v1/cluster/configuration", function(json){
-              var displayResource = json["scheduler.resource.display"];
+              var displayResource = json["scheduler.display.resource"];
               if (!displayResource){
                   $('#topology-resources-header').hide();
                   $('#topology-resources').hide();